diff --git a/README.md b/README.md index adcc4ffb..09b48551 100644 --- a/README.md +++ b/README.md @@ -205,7 +205,7 @@ For programmatic access, add SoftClient4ES to your project: resolvers += "Softnetwork" at "https://softnetwork.jfrog.io/artifactory/releases/" // Choose your Elasticsearch version -libraryDependencies += "app.softnetwork.elastic" %% "softclient4es8-java-client" % "0.17.1" +libraryDependencies += "app.softnetwork.elastic" %% "softclient4es8-java-client" % "0.17.2" // Add the community extensions for materialized views (optional) libraryDependencies += "app.softnetwork.elastic" %% "softclient4es8-community-extensions" % "0.1.0" ``` diff --git a/build.sbt b/build.sbt index 37d17813..fa869f5c 100644 --- a/build.sbt +++ b/build.sbt @@ -20,7 +20,7 @@ ThisBuild / organization := "app.softnetwork" name := "softclient4es" -ThisBuild / version := "0.17.1" +ThisBuild / version := "0.17.2" ThisBuild / scalaVersion := scala213 @@ -222,7 +222,10 @@ def testkitProject(esVersion: String, ss: Def.SettingsDefinition*): Project = { // "org.apache.logging.log4j" % "log4j-slf4j-impl" % Versions.log4j, "org.apache.logging.log4j" % "log4j-core" % Versions.log4j, "app.softnetwork.persistence" %% "persistence-core-testkit" % Versions.genericPersistence, - "org.testcontainers" % "testcontainers-elasticsearch" % Versions.testContainers excludeAll (jacksonExclusions: _*) + "org.testcontainers" % "testcontainers-elasticsearch" % Versions.testContainers excludeAll (jacksonExclusions: _*), + "org.testcontainers" % "testcontainers-minio" % Versions.testContainers, + // Required at test runtime for COPY INTO ... FROM 's3a://...' tests via MinioTestKit + // "org.apache.hadoop" % "hadoop-aws" % Versions.hadoop % Test excludeAll (excludeSlf4jAndLog4j: _*) ), Compile / compile := (Compile / compile).dependsOn(copyTestkit(esVersion)).value ) diff --git a/core/build.sbt b/core/build.sbt index 49468076..2c09af23 100644 --- a/core/build.sbt +++ b/core/build.sbt @@ -34,7 +34,20 @@ val mockito = Seq( val avro = Seq( "org.apache.parquet" % "parquet-avro" % "1.15.2" excludeAll (excludeSlf4jAndLog4j *), "org.apache.avro" % "avro" % "1.11.4" excludeAll (excludeSlf4jAndLog4j *), - "org.apache.hadoop" % "hadoop-client" % "3.4.2" excludeAll (excludeSlf4jAndLog4j *) + "org.apache.hadoop" % "hadoop-client" % Versions.hadoop excludeAll (excludeSlf4jAndLog4j *) +) + +// Cloud storage connectors — provided: must be added to the classpath at runtime / assembly. +// Each connector activates the corresponding URI scheme in HadoopConfigurationFactory: +// s3a:// → hadoop-aws (+ AWS credentials via AWS_* env vars) +// abfs:// → hadoop-azure (+ Azure credentials via AZURE_* env vars) +// gs:// → gcs-connector (+ GCP credentials via GOOGLE_APPLICATION_CREDENTIALS) +val cloudConnectors = Seq( + "org.apache.hadoop" % "hadoop-aws" % Versions.hadoop % Provided excludeAll (excludeSlf4jAndLog4j *), + "org.apache.hadoop" % "hadoop-azure" % Versions.hadoop % Provided excludeAll (excludeSlf4jAndLog4j *), + "com.google.cloud.bigdataoss" % "gcs-connector" % Versions.gcsConnector % Provided + exclude ("com.google.guava", "guava") + excludeAll (excludeSlf4jAndLog4j *) ) val repl = Seq( @@ -49,7 +62,7 @@ val repl = Seq( ) libraryDependencies ++= akka ++ typesafeConfig ++ http ++ -json4s ++ mockito ++ avro ++ repl :+ "com.google.code.gson" % "gson" % Versions.gson :+ +json4s ++ mockito ++ avro ++ cloudConnectors ++ repl :+ "com.google.code.gson" % "gson" % Versions.gson :+ "com.typesafe.scala-logging" %% "scala-logging" % Versions.scalaLogging :+ "io.delta" %% "delta-standalone" % Versions.delta :+ "org.scalatest" %% "scalatest" % Versions.scalatest % Test diff --git a/core/src/main/scala/app/softnetwork/elastic/client/IndicesApi.scala b/core/src/main/scala/app/softnetwork/elastic/client/IndicesApi.scala index a083b883..aa1617bf 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/IndicesApi.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/IndicesApi.scala @@ -1260,7 +1260,7 @@ trait IndicesApi extends ElasticClientHelpers { suffixDateKey = suffixKey, suffixDatePattern = suffixPattern, update = Some(doUpdate), - hadoopConf = hadoopConf + hadoopConf = Some(hadoopConf.getOrElse(file.HadoopConfigurationFactory.forPath(source))) )(BulkOptions(defaultIndex = target), system) } yield bulkResult diff --git a/core/src/main/scala/app/softnetwork/elastic/client/file/package.scala b/core/src/main/scala/app/softnetwork/elastic/client/file/package.scala index 2758373e..3da11794 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/file/package.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/file/package.scala @@ -148,7 +148,7 @@ package object file { checkIsFile: Boolean = true )(implicit conf: Configuration): Unit = { val path = new Path(filePath) - val fs = FileSystem.get(conf) + val fs = FileSystem.get(path.toUri, conf) if (!fs.exists(path)) { throw new IllegalArgumentException(s"File does not exist: $filePath") @@ -1144,6 +1144,237 @@ package object file { } } + /** Builds a Hadoop [[Configuration]] suited to the URI scheme of a given path. + * + * Supported schemes: + * - `s3a://`, `s3://` → AWS S3 via S3AFileSystem (reads `AWS_*` env vars) + * - `abfs://`, `abfss://` → Azure ADLS Gen2 (reads `AZURE_*` env vars) + * - `wasb://`, `wasbs://` → Azure Blob Storage (reads `AZURE_*` env vars) + * - `gs://` → Google Cloud Storage (reads `GOOGLE_*` env vars) + * - `hdfs://` → HDFS (auto-loads `HADOOP_CONF_DIR` XML files) + * - anything else / no scheme → local filesystem + * + * Additionally, any `*.xml` files found in `~/.softclient4es/` are loaded on top of the + * auto-detected configuration, allowing per-user overrides. + * + * Cloud connector JARs (`hadoop-aws`, `hadoop-azure`, `gcs-connector`) must be present in the + * classpath at runtime (declared as `provided` dependencies in the build). + */ + object HadoopConfigurationFactory { + + private val factoryLogger: Logger = LoggerFactory.getLogger("HadoopConfigurationFactory") + + /** Returns the value of env var `name`, falling back to the JVM system property of the same + * name. This allows test harnesses to inject credentials via `System.setProperty(...)` when + * environment variables cannot be mutated at runtime. + */ + private def envOrProp(name: String): Option[String] = + sys.env.get(name).orElse(Option(System.getProperty(name))) + + /** Returns a [[Configuration]] appropriate for the URI scheme embedded in `path`. */ + def forPath(path: String): Configuration = { + val scheme = Try(new java.net.URI(path).getScheme).getOrElse(null) + val conf = scheme match { + case "s3a" | "s3" => s3aConf() + case "abfs" | "abfss" | "wasb" | "wasbs" => azureConf() + case "gs" => gcsConf() + case "hdfs" => hdfsConf() + case _ => localConf() + } + loadUserXmlConf(conf) + conf + } + + // ------------------------------------------------------------------------- + // Private builders + // ------------------------------------------------------------------------- + + private def base(): Configuration = { + val conf = new Configuration() + conf.setBoolean("parquet.avro.readInt96AsFixed", true) + conf.setInt("io.file.buffer.size", 65536) + conf.setBoolean("fs.automatic.close", true) + conf + } + + private def localConf(): Configuration = { + val conf = base() + conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem") + conf + } + + /** AWS S3 via S3AFileSystem. + * + * Reads (in priority order): + * - `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY` / `AWS_SESSION_TOKEN` + * - `AWS_REGION` or `AWS_DEFAULT_REGION` + * - `AWS_ENDPOINT_URL` for S3-compatible stores (MinIO, LocalStack, …) + * + * Falls back to `DefaultAWSCredentialsProviderChain` (IAM roles, `~/.aws/credentials`, …) when + * no explicit credentials are found. + */ + private def s3aConf(): Configuration = { + val conf = base() + conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") + conf.set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A") + + val hasStaticCreds = envOrProp("AWS_ACCESS_KEY_ID").isDefined + if (hasStaticCreds) { + envOrProp("AWS_ACCESS_KEY_ID").foreach(conf.set("fs.s3a.access.key", _)) + envOrProp("AWS_SECRET_ACCESS_KEY").foreach(conf.set("fs.s3a.secret.key", _)) + envOrProp("AWS_SESSION_TOKEN").foreach(conf.set("fs.s3a.session.token", _)) + } else { + conf.set( + "fs.s3a.aws.credentials.provider", + "com.amazonaws.auth.DefaultAWSCredentialsProviderChain" + ) + } + + envOrProp("AWS_REGION") + .orElse(envOrProp("AWS_DEFAULT_REGION")) + .foreach(conf.set("fs.s3a.endpoint.region", _)) + + envOrProp("AWS_ENDPOINT_URL").foreach { url => + conf.set("fs.s3a.endpoint", url) + conf.setBoolean("fs.s3a.path.style.access", true) + } + + factoryLogger.info("Configured S3A filesystem") + conf + } + + /** Azure ADLS Gen2 (`abfs`/`abfss`) and Azure Blob Storage (`wasb`/`wasbs`). + * + * Authentication is resolved in this order: + * 1. Shared-key: `AZURE_STORAGE_ACCOUNT_NAME` + `AZURE_STORAGE_ACCOUNT_KEY` 2. OAuth2 + * service principal: `AZURE_CLIENT_ID` + `AZURE_CLIENT_SECRET` + `AZURE_TENANT_ID` 3. SAS + * token: `AZURE_STORAGE_ACCOUNT_NAME` + `AZURE_STORAGE_SAS_TOKEN` + */ + private def azureConf(): Configuration = { + val conf = base() + conf.set("fs.abfs.impl", "org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem") + conf.set("fs.abfss.impl", "org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem") + conf.set("fs.wasb.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") + conf.set("fs.wasbs.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem$Secure") + + val accountName = sys.env.get("AZURE_STORAGE_ACCOUNT_NAME") + + // 1. Shared-key + for { + account <- accountName + key <- sys.env.get("AZURE_STORAGE_ACCOUNT_KEY") + } { + conf.set(s"fs.azure.account.key.$account.dfs.core.windows.net", key) + conf.set(s"fs.azure.account.key.$account.blob.core.windows.net", key) + } + + // 2. OAuth2 service principal + for { + clientId <- sys.env.get("AZURE_CLIENT_ID") + clientSecret <- sys.env.get("AZURE_CLIENT_SECRET") + tenantId <- sys.env.get("AZURE_TENANT_ID") + account <- accountName + } { + conf.set(s"fs.azure.account.auth.type.$account.dfs.core.windows.net", "OAuth") + conf.set( + s"fs.azure.account.oauth.provider.type.$account.dfs.core.windows.net", + "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider" + ) + conf.set( + s"fs.azure.account.oauth2.client.endpoint.$account.dfs.core.windows.net", + s"https://login.microsoftonline.com/$tenantId/oauth2/token" + ) + conf.set( + s"fs.azure.account.oauth2.client.id.$account.dfs.core.windows.net", + clientId + ) + conf.set( + s"fs.azure.account.oauth2.client.secret.$account.dfs.core.windows.net", + clientSecret + ) + } + + // 3. SAS token + for { + account <- accountName + sasToken <- sys.env.get("AZURE_STORAGE_SAS_TOKEN") + } { + conf.set(s"fs.azure.sas.$account.blob.core.windows.net", sasToken) + } + + factoryLogger.info("Configured Azure filesystem (ADLS Gen2 / Blob Storage)") + conf + } + + /** Google Cloud Storage via the GCS Hadoop connector. + * + * Reads: + * - `GOOGLE_APPLICATION_CREDENTIALS` → path to a service-account JSON key file + * - `GOOGLE_CLOUD_PROJECT` → GCS project id (optional) + * + * Falls back to Application Default Credentials (ADC) when no env var is set. + */ + private def gcsConf(): Configuration = { + val conf = base() + conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") + conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") + + sys.env.get("GOOGLE_APPLICATION_CREDENTIALS") match { + case Some(keyFile) => + conf.set("google.cloud.auth.service.account.enable", "true") + conf.set("google.cloud.auth.service.account.json.keyfile", keyFile) + case None => + // Use Application Default Credentials (Workload Identity, gcloud auth, …) + conf.set("google.cloud.auth.type", "APPLICATION_DEFAULT") + } + + sys.env.get("GOOGLE_CLOUD_PROJECT").foreach(conf.set("fs.gs.project.id", _)) + + factoryLogger.info("Configured GCS filesystem") + conf + } + + /** HDFS — the namenode is encoded in the path URI itself (`hdfs://namenode:port/…`). + * + * Also loads `core-site.xml` and `hdfs-site.xml` from `HADOOP_CONF_DIR` when available, and + * propagates `HADOOP_USER_NAME` as a system property. + */ + private def hdfsConf(): Configuration = { + val conf = base() + + sys.env.get("HADOOP_USER_NAME").foreach(System.setProperty("HADOOP_USER_NAME", _)) + + sys.env.get("HADOOP_CONF_DIR").foreach { dir => + Seq("core-site.xml", "hdfs-site.xml").foreach { xml => + val f = new java.io.File(dir, xml) + if (f.exists()) { + factoryLogger.info(s"Loading HDFS config: ${f.getAbsolutePath}") + conf.addResource(new Path(f.getAbsolutePath)) + } + } + } + + factoryLogger.info("Configured HDFS filesystem") + conf + } + + /** Loads every `*.xml` file found in `~/.softclient4es/` on top of the given configuration. + * This lets users override or extend any property set by the auto-detection logic. + */ + private def loadUserXmlConf(conf: Configuration): Unit = { + val userConfDir = new java.io.File(System.getProperty("user.home"), ".softclient4es") + if (userConfDir.isDirectory) { + val xmlFiles = userConfDir.listFiles(f => f.isFile && f.getName.endsWith(".xml")) + if (xmlFiles != null) { + xmlFiles.foreach { f => + factoryLogger.info(s"Loading user Hadoop config override: ${f.getAbsolutePath}") + conf.addResource(new Path(f.getAbsolutePath)) + } + } + } + } + } + /** Factory to get the appropriate FileSource based on file format */ object FileSourceFactory { diff --git a/documentation/sql/dml_statements.md b/documentation/sql/dml_statements.md index d99dab5f..30e7f985 100644 --- a/documentation/sql/dml_statements.md +++ b/documentation/sql/dml_statements.md @@ -324,6 +324,75 @@ ON CONFLICT (uuid) DO UPDATE; --- +### Remote File System Support + +`COPY INTO` transparently supports remote file systems by auto-detecting the URI scheme in the `FROM` path. +No SQL syntax change is required — simply use the appropriate URI scheme. + +| URI scheme | File system | Required JAR | +| --- | --- | --- | +| `s3a://` or `s3://` | AWS S3 | `hadoop-aws` | +| `abfs://`, `abfss://`, `wasb://`, `wasbs://` | Azure ADLS Gen2 / Blob Storage | `hadoop-azure` | +| `gs://` | Google Cloud Storage | `gcs-connector-hadoop3` | +| `hdfs://` | HDFS | _(bundled with hadoop-client)_ | +| _(no scheme / local path)_ | Local filesystem | _(no extra JAR needed)_ | + +> **Important:** Cloud connector JARs are declared as `provided` dependencies and are **not bundled** in the library. +> They must be present in the runtime classpath (e.g. added to the CLI assembly or the application's fat-jar). + +#### Credentials configuration + +Authentication is resolved automatically from standard environment variables: + +**AWS S3** +``` +AWS_ACCESS_KEY_ID # access key (falls back to DefaultAWSCredentialsProviderChain) +AWS_SECRET_ACCESS_KEY # secret key +AWS_SESSION_TOKEN # session token (optional) +AWS_REGION # region (or AWS_DEFAULT_REGION) +AWS_ENDPOINT_URL # custom endpoint for S3-compatible stores (MinIO, LocalStack, …) +``` + +**Azure ADLS Gen2 / Blob Storage** +``` +AZURE_STORAGE_ACCOUNT_NAME # storage account name +AZURE_STORAGE_ACCOUNT_KEY # shared key (Option 1) +AZURE_CLIENT_ID # service principal client ID (Option 2 — OAuth2) +AZURE_CLIENT_SECRET # service principal secret (Option 2 — OAuth2) +AZURE_TENANT_ID # Azure tenant ID (Option 2 — OAuth2) +AZURE_STORAGE_SAS_TOKEN # SAS token (Option 3) +``` + +**Google Cloud Storage** +``` +GOOGLE_APPLICATION_CREDENTIALS # path to service-account JSON key file +GOOGLE_CLOUD_PROJECT # GCS project ID (optional) +``` +Falls back to Application Default Credentials (Workload Identity, `gcloud auth`, …) when the variable is absent. + +**HDFS** +``` +HADOOP_CONF_DIR # directory containing core-site.xml and hdfs-site.xml +HADOOP_USER_NAME # Hadoop user name (optional) +``` + +#### Per-user Hadoop overrides + +Any `*.xml` file placed in `~/.softclient4es/` is loaded on top of the auto-detected configuration. +This allows fine-grained property overrides without changing environment variables. + +```xml + + + + fs.s3a.connection.maximum + 200 + + +``` + +--- + ## DML Lifecycle Example ```sql diff --git a/es6/jest/build.sbt b/es6/jest/build.sbt index 6a6bfbfa..f6738713 100644 --- a/es6/jest/build.sbt +++ b/es6/jest/build.sbt @@ -5,7 +5,10 @@ organization := "app.softnetwork.elastic" name := s"softclient4es${elasticSearchMajorVersion(elasticSearchVersion.value)}-jest-client" libraryDependencies ++= jestClientDependencies(elasticSearchVersion.value) ++ - elastic4sDependencies(elasticSearchVersion.value) + elastic4sDependencies(elasticSearchVersion.value) ++ Seq( + // Required at test runtime for COPY INTO ... FROM 's3a://...' tests via MinioTestKit + "org.apache.hadoop" % "hadoop-aws" % Versions.hadoop % Test excludeAll (excludeSlf4jAndLog4j *) +) val testJavaOptions = { val heapSize = sys.env.getOrElse("HEAP_SIZE", "1g") diff --git a/es6/rest/build.sbt b/es6/rest/build.sbt index 01ed6651..0b7ce6ee 100644 --- a/es6/rest/build.sbt +++ b/es6/rest/build.sbt @@ -5,7 +5,10 @@ organization := "app.softnetwork.elastic" name := s"softclient4es${elasticSearchMajorVersion(elasticSearchVersion.value)}-rest-client" libraryDependencies ++= restClientDependencies(elasticSearchVersion.value) ++ - elastic4sDependencies(elasticSearchVersion.value) + elastic4sDependencies(elasticSearchVersion.value) ++ Seq( + // Required at test runtime for COPY INTO ... FROM 's3a://...' tests via MinioTestKit + "org.apache.hadoop" % "hadoop-aws" % Versions.hadoop % Test excludeAll (excludeSlf4jAndLog4j *) +) val testJavaOptions = { val heapSize = sys.env.getOrElse("HEAP_SIZE", "1g") diff --git a/es6/rest/src/test/scala/app/softnetwork/elastic/client/RestHighLevelClientCopyIntoS3Spec.scala b/es6/rest/src/test/scala/app/softnetwork/elastic/client/RestHighLevelClientCopyIntoS3Spec.scala new file mode 100644 index 00000000..a5e9a810 --- /dev/null +++ b/es6/rest/src/test/scala/app/softnetwork/elastic/client/RestHighLevelClientCopyIntoS3Spec.scala @@ -0,0 +1,21 @@ +package app.softnetwork.elastic.client + +import app.softnetwork.elastic.client.spi.RestHighLevelClientSpi +import app.softnetwork.elastic.scalatest.{ElasticDockerTestKit, MinioTestKit} + +/** Integration test for `COPY INTO ... FROM 's3a://...'` using the ES8 Java client. + * + * Requires Docker to run two containers: + * - Elasticsearch (managed by [[ElasticDockerTestKit]]) + * - MinIO S3-compatible store (managed by [[MinioTestKit]]) + * + * The `hadoop-aws` JAR must be on the test classpath (declared as `% Test` in + * `es8/java/build.sbt`) for the S3A filesystem to work at runtime. + */ +class RestHighLevelClientCopyIntoS3Spec + extends CopyIntoS3IntegrationSpec + with ElasticDockerTestKit + with MinioTestKit { + + override lazy val client: GatewayApi = new RestHighLevelClientSpi().client(elasticConfig) +} diff --git a/es7/rest/build.sbt b/es7/rest/build.sbt index 01ed6651..0b7ce6ee 100644 --- a/es7/rest/build.sbt +++ b/es7/rest/build.sbt @@ -5,7 +5,10 @@ organization := "app.softnetwork.elastic" name := s"softclient4es${elasticSearchMajorVersion(elasticSearchVersion.value)}-rest-client" libraryDependencies ++= restClientDependencies(elasticSearchVersion.value) ++ - elastic4sDependencies(elasticSearchVersion.value) + elastic4sDependencies(elasticSearchVersion.value) ++ Seq( + // Required at test runtime for COPY INTO ... FROM 's3a://...' tests via MinioTestKit + "org.apache.hadoop" % "hadoop-aws" % Versions.hadoop % Test excludeAll (excludeSlf4jAndLog4j *) +) val testJavaOptions = { val heapSize = sys.env.getOrElse("HEAP_SIZE", "1g") diff --git a/es7/rest/src/test/scala/app/softnetwork/elastic/client/RestHighLevelClientCopyIntoS3Spec.scala b/es7/rest/src/test/scala/app/softnetwork/elastic/client/RestHighLevelClientCopyIntoS3Spec.scala new file mode 100644 index 00000000..a5e9a810 --- /dev/null +++ b/es7/rest/src/test/scala/app/softnetwork/elastic/client/RestHighLevelClientCopyIntoS3Spec.scala @@ -0,0 +1,21 @@ +package app.softnetwork.elastic.client + +import app.softnetwork.elastic.client.spi.RestHighLevelClientSpi +import app.softnetwork.elastic.scalatest.{ElasticDockerTestKit, MinioTestKit} + +/** Integration test for `COPY INTO ... FROM 's3a://...'` using the ES8 Java client. + * + * Requires Docker to run two containers: + * - Elasticsearch (managed by [[ElasticDockerTestKit]]) + * - MinIO S3-compatible store (managed by [[MinioTestKit]]) + * + * The `hadoop-aws` JAR must be on the test classpath (declared as `% Test` in + * `es8/java/build.sbt`) for the S3A filesystem to work at runtime. + */ +class RestHighLevelClientCopyIntoS3Spec + extends CopyIntoS3IntegrationSpec + with ElasticDockerTestKit + with MinioTestKit { + + override lazy val client: GatewayApi = new RestHighLevelClientSpi().client(elasticConfig) +} diff --git a/es8/java/build.sbt b/es8/java/build.sbt index e23ec0a3..b8c68406 100644 --- a/es8/java/build.sbt +++ b/es8/java/build.sbt @@ -5,7 +5,11 @@ organization := "app.softnetwork.elastic" name := s"softclient4es${elasticSearchMajorVersion(elasticSearchVersion.value)}-java-client" libraryDependencies ++= javaClientDependencies(elasticSearchVersion.value) ++ - elastic4sDependencies(elasticSearchVersion.value) + elastic4sDependencies(elasticSearchVersion.value) ++ + Seq( + // Required at test runtime for COPY INTO ... FROM 's3a://...' tests via MinioTestKit + "org.apache.hadoop" % "hadoop-aws" % Versions.hadoop % Test excludeAll (excludeSlf4jAndLog4j *) + ) val testJavaOptions = { val heapSize = sys.env.getOrElse("HEAP_SIZE", "1g") diff --git a/es8/java/src/test/scala/app/softnetwork/elastic/client/JavaClientCopyIntoS3Spec.scala b/es8/java/src/test/scala/app/softnetwork/elastic/client/JavaClientCopyIntoS3Spec.scala new file mode 100644 index 00000000..9f2ca99b --- /dev/null +++ b/es8/java/src/test/scala/app/softnetwork/elastic/client/JavaClientCopyIntoS3Spec.scala @@ -0,0 +1,21 @@ +package app.softnetwork.elastic.client + +import app.softnetwork.elastic.client.spi.JavaClientSpi +import app.softnetwork.elastic.scalatest.{ElasticDockerTestKit, MinioTestKit} + +/** Integration test for `COPY INTO ... FROM 's3a://...'` using the ES8 Java client. + * + * Requires Docker to run two containers: + * - Elasticsearch (managed by [[ElasticDockerTestKit]]) + * - MinIO S3-compatible store (managed by [[MinioTestKit]]) + * + * The `hadoop-aws` JAR must be on the test classpath (declared as `% Test` in + * `es8/java/build.sbt`) for the S3A filesystem to work at runtime. + */ +class JavaClientCopyIntoS3Spec + extends CopyIntoS3IntegrationSpec + with ElasticDockerTestKit + with MinioTestKit { + + override lazy val client: GatewayApi = new JavaClientSpi().client(elasticConfig) +} diff --git a/es9/java/build.sbt b/es9/java/build.sbt index e23ec0a3..58a970d0 100644 --- a/es9/java/build.sbt +++ b/es9/java/build.sbt @@ -5,7 +5,10 @@ organization := "app.softnetwork.elastic" name := s"softclient4es${elasticSearchMajorVersion(elasticSearchVersion.value)}-java-client" libraryDependencies ++= javaClientDependencies(elasticSearchVersion.value) ++ - elastic4sDependencies(elasticSearchVersion.value) + elastic4sDependencies(elasticSearchVersion.value) ++ Seq( + // Required at test runtime for COPY INTO ... FROM 's3a://...' tests via MinioTestKit + "org.apache.hadoop" % "hadoop-aws" % Versions.hadoop % Test excludeAll (excludeSlf4jAndLog4j *) +) val testJavaOptions = { val heapSize = sys.env.getOrElse("HEAP_SIZE", "1g") diff --git a/es9/java/src/test/scala/app/softnetwork/elastic/client/JavaClientCopyIntoS3Spec.scala b/es9/java/src/test/scala/app/softnetwork/elastic/client/JavaClientCopyIntoS3Spec.scala new file mode 100644 index 00000000..9f2ca99b --- /dev/null +++ b/es9/java/src/test/scala/app/softnetwork/elastic/client/JavaClientCopyIntoS3Spec.scala @@ -0,0 +1,21 @@ +package app.softnetwork.elastic.client + +import app.softnetwork.elastic.client.spi.JavaClientSpi +import app.softnetwork.elastic.scalatest.{ElasticDockerTestKit, MinioTestKit} + +/** Integration test for `COPY INTO ... FROM 's3a://...'` using the ES8 Java client. + * + * Requires Docker to run two containers: + * - Elasticsearch (managed by [[ElasticDockerTestKit]]) + * - MinIO S3-compatible store (managed by [[MinioTestKit]]) + * + * The `hadoop-aws` JAR must be on the test classpath (declared as `% Test` in + * `es8/java/build.sbt`) for the S3A filesystem to work at runtime. + */ +class JavaClientCopyIntoS3Spec + extends CopyIntoS3IntegrationSpec + with ElasticDockerTestKit + with MinioTestKit { + + override lazy val client: GatewayApi = new JavaClientSpi().client(elasticConfig) +} diff --git a/project/Versions.scala b/project/Versions.scala index 5f60cb1c..e02c0d3e 100644 --- a/project/Versions.scala +++ b/project/Versions.scala @@ -55,4 +55,9 @@ object Versions { val jline = "3.30.6" val fansi = "0.5.1" + + // Cloud storage connectors (provided scope — must be on classpath at runtime) + val hadoop = "3.4.2" // must match hadoop-client in core/build.sbt + + val gcsConnector = "hadoop3-2.2.24" } diff --git a/testkit/build.sbt b/testkit/build.sbt index 61f75182..af7ebf4e 100644 --- a/testkit/build.sbt +++ b/testkit/build.sbt @@ -11,5 +11,6 @@ libraryDependencies ++= elasticClientDependencies(elasticSearchVersion.value) ++ // "org.apache.logging.log4j" % "log4j-slf4j-impl" % Versions.log4j, "org.apache.logging.log4j" % "log4j-core" % Versions.log4j, "app.softnetwork.persistence" %% "persistence-core-testkit" % Versions.genericPersistence, - "org.testcontainers" % "testcontainers-elasticsearch" % Versions.testContainers excludeAll (jacksonExclusions: _*) + "org.testcontainers" % "testcontainers-elasticsearch" % Versions.testContainers excludeAll (jacksonExclusions: _*), + "org.testcontainers" % "testcontainers-minio" % Versions.testContainers ) diff --git a/testkit/src/main/scala/app/softnetwork/elastic/client/CopyIntoS3IntegrationSpec.scala b/testkit/src/main/scala/app/softnetwork/elastic/client/CopyIntoS3IntegrationSpec.scala new file mode 100644 index 00000000..5dc2464d --- /dev/null +++ b/testkit/src/main/scala/app/softnetwork/elastic/client/CopyIntoS3IntegrationSpec.scala @@ -0,0 +1,156 @@ +/* + * Copyright 2025 SOFTNETWORK + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app.softnetwork.elastic.client + +import app.softnetwork.elastic.client.result.DmlResult +import app.softnetwork.elastic.scalatest.{ElasticTestKit, MinioTestKit} + +import java.time.LocalDate + +/** Integration tests for `COPY INTO ... FROM 's3a://...'` backed by a MinIO container. + * + * Mix this trait into a concrete test class together with + * [[app.softnetwork.elastic.scalatest.ElasticDockerTestKit]] and [[MinioTestKit]]: + * + * {{{ + * class JavaClientCopyIntoS3Spec + * extends CopyIntoS3IntegrationSpec + * with ElasticDockerTestKit + * with MinioTestKit { + * override lazy val client: GatewayApi = new JavaClientSpi().client(elasticConfig) + * } + * }}} + * + * The concrete module must declare `hadoop-aws` as a `% Test` dependency so that the S3A + * filesystem implementation is available at test runtime. + */ +trait CopyIntoS3IntegrationSpec extends GatewayIntegrationTestKit { + self: ElasticTestKit with MinioTestKit => + + // --------------------------------------------------------------------------- + // COPY INTO from S3 (MinIO) integration tests + // --------------------------------------------------------------------------- + + behavior of "COPY INTO from S3" + + it should "create the target table for S3 COPY INTO tests" in { + val create = + """CREATE TABLE IF NOT EXISTS copy_into_s3_test ( + | uuid KEYWORD NOT NULL, + | name VARCHAR, + | birthDate DATE, + | score DOUBLE, + | PRIMARY KEY (uuid) + |);""".stripMargin + + assertDdl(System.nanoTime(), client.run(create).futureValue) + } + + it should "support COPY INTO from S3 with JSONL format (auto-detected)" in { + val persons = List( + """{"uuid": "S01", "name": "Leia Organa", "birthDate": "1956-10-21", "score": 9.5}""", + """{"uuid": "S02", "name": "Han Solo", "birthDate": "1942-07-13", "score": 8.0}""", + """{"uuid": "S03", "name": "Luke Skywalker", "birthDate": "1951-09-25", "score": 9.0}""" + ) + + uploadToMinio(persons.mkString("\n"), "persons/data.jsonl") + + val copyInto = + s"""COPY INTO copy_into_s3_test + |FROM 's3a://$minioBucket/persons/data.jsonl';""".stripMargin + + assertDml( + System.nanoTime(), + client.run(copyInto).futureValue, + Some(DmlResult(inserted = persons.size)) + ) + + assertSelectResult( + System.nanoTime(), + client.run("SELECT * FROM copy_into_s3_test ORDER BY uuid ASC").futureValue, + Seq( + Map( + "uuid" -> "S01", + "name" -> "Leia Organa", + "birthDate" -> LocalDate.parse("1956-10-21"), + "score" -> 9.5 + ), + Map( + "uuid" -> "S02", + "name" -> "Han Solo", + "birthDate" -> LocalDate.parse("1942-07-13"), + "score" -> 8.0 + ), + Map( + "uuid" -> "S03", + "name" -> "Luke Skywalker", + "birthDate" -> LocalDate.parse("1951-09-25"), + "score" -> 9.0 + ) + ) + ) + } + + it should "support COPY INTO from S3 with FILE_FORMAT = JSON_ARRAY and ON CONFLICT DO UPDATE" in { + // Updated scores — ON CONFLICT DO UPDATE should replace the existing documents. + val updatedPersons = List( + """{"uuid": "S01", "name": "Leia Organa", "birthDate": "1956-10-21", "score": 9.8}""", + """{"uuid": "S02", "name": "Han Solo", "birthDate": "1942-07-13", "score": 8.5}""", + """{"uuid": "S03", "name": "Luke Skywalker", "birthDate": "1951-09-25", "score": 9.2}""" + ) + val jsonArray = s"[\n${updatedPersons.mkString(",\n")}\n]" + + uploadToMinio(jsonArray, "persons/data_updated.json") + + val copyInto = + s"""COPY INTO copy_into_s3_test + |FROM 's3a://$minioBucket/persons/data_updated.json' + |FILE_FORMAT = JSON_ARRAY + |ON CONFLICT (uuid) DO UPDATE;""".stripMargin + + assertDml( + System.nanoTime(), + client.run(copyInto).futureValue, + Some(DmlResult(inserted = updatedPersons.size)) + ) + + assertSelectResult( + System.nanoTime(), + client.run("SELECT * FROM copy_into_s3_test ORDER BY uuid ASC").futureValue, + Seq( + Map( + "uuid" -> "S01", + "name" -> "Leia Organa", + "birthDate" -> LocalDate.parse("1956-10-21"), + "score" -> 9.8 + ), + Map( + "uuid" -> "S02", + "name" -> "Han Solo", + "birthDate" -> LocalDate.parse("1942-07-13"), + "score" -> 8.5 + ), + Map( + "uuid" -> "S03", + "name" -> "Luke Skywalker", + "birthDate" -> LocalDate.parse("1951-09-25"), + "score" -> 9.2 + ) + ) + ) + } +} diff --git a/testkit/src/main/scala/app/softnetwork/elastic/scalatest/MinioTestKit.scala b/testkit/src/main/scala/app/softnetwork/elastic/scalatest/MinioTestKit.scala new file mode 100644 index 00000000..5a2eaf2c --- /dev/null +++ b/testkit/src/main/scala/app/softnetwork/elastic/scalatest/MinioTestKit.scala @@ -0,0 +1,123 @@ +/* + * Copyright 2025 SOFTNETWORK + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app.softnetwork.elastic.scalatest + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.scalatest.Suite +import org.testcontainers.containers.MinIOContainer + +/** A test kit trait that starts a MinIO container and provides helper methods for uploading files + * to MinIO via the Hadoop S3A filesystem. + * + * Uses the official Testcontainers MinIO module (`testcontainers-minio`). Mix this trait into a + * concrete test class together with [[ElasticDockerTestKit]]. + * + * The concrete module must declare `hadoop-aws` as a `% Test` dependency so that the + * `S3AFileSystem` implementation is available on the test classpath at runtime. + * + * @example + * {{{ + * class MySpec + * extends CopyIntoS3IntegrationSpec + * with ElasticDockerTestKit + * with MinioTestKit { + * override lazy val client: GatewayApi = new JavaClientSpi().client(elasticConfig) + * } + * }}} + */ +trait MinioTestKit extends ElasticTestKit { _: Suite => + + lazy val minioBucket: String = "copy-into-test" + + lazy val minioContainer: MinIOContainer = + new MinIOContainer("minio/minio:latest") + + def minioEndpoint: String = minioContainer.getS3URL + def minioAccessKey: String = minioContainer.getUserName + def minioSecretKey: String = minioContainer.getPassword + + /** Returns a Hadoop [[Configuration]] pointing at the in-process MinIO container. + * + * The S3AFileSystem implementation (`hadoop-aws`) must be on the classpath at runtime. + */ + def minioHadoopConf(): Configuration = { + val conf = new Configuration() + conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") + conf.set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A") + conf.set("fs.s3a.access.key", minioAccessKey) + conf.set("fs.s3a.secret.key", minioSecretKey) + conf.set("fs.s3a.endpoint", minioEndpoint) + conf.setBoolean("fs.s3a.path.style.access", true) + conf.set("fs.s3a.connection.ssl.enabled", "false") + conf.set("fs.s3a.attempts.maximum", "3") + conf.set( + "fs.s3a.aws.credentials.provider", + "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" + ) + conf + } + + /** Uploads UTF-8 `content` as object `objectKey` inside [[minioBucket]]. */ + def uploadToMinio(content: String, objectKey: String): Unit = { + val conf = minioHadoopConf() + val objectPath = new Path(s"s3a://$minioBucket/$objectKey") + val fs = FileSystem.get(objectPath.toUri, conf) + val out = fs.create(objectPath, /* overwrite = */ true) + try out.write(content.getBytes("UTF-8")) + finally { + out.close() + fs.close() + } + } + + /** Starts the MinIO container, creates the test bucket, and then starts Elasticsearch. */ + abstract override def start(): Unit = { + minioContainer.start() + + // Expose MinIO credentials as JVM system properties so that + // HadoopConfigurationFactory.s3aConf() picks them up via envOrProp(). + System.setProperty("AWS_ACCESS_KEY_ID", minioAccessKey) + System.setProperty("AWS_SECRET_ACCESS_KEY", minioSecretKey) + System.setProperty("AWS_ENDPOINT_URL", minioEndpoint) + + // Create the test bucket using the mc CLI bundled in the minio/minio image. + // mc connects to localhost:9000 (the internal port) from within the container. + minioContainer.execInContainer( + "mc", + "alias", + "set", + "myminio", + "http://localhost:9000", + minioContainer.getUserName, + minioContainer.getPassword + ) + minioContainer.execInContainer("mc", "mb", "--ignore-existing", s"myminio/$minioBucket") + + super.start() + } + + /** Stops Elasticsearch first, then tears down the MinIO container and clears system properties. + */ + abstract override def stop(): Unit = { + super.stop() + System.clearProperty("AWS_ACCESS_KEY_ID") + System.clearProperty("AWS_SECRET_ACCESS_KEY") + System.clearProperty("AWS_ENDPOINT_URL") + if (minioContainer.isRunning) minioContainer.stop() + } +}