Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ For programmatic access, add SoftClient4ES to your project:
resolvers += "Softnetwork" at "https://softnetwork.jfrog.io/artifactory/releases/"

// Choose your Elasticsearch version
libraryDependencies += "app.softnetwork.elastic" %% "softclient4es8-java-client" % "0.17.1"
libraryDependencies += "app.softnetwork.elastic" %% "softclient4es8-java-client" % "0.17.2"
// Add the community extensions for materialized views (optional)
libraryDependencies += "app.softnetwork.elastic" %% "softclient4es8-community-extensions" % "0.1.0"
```
Expand Down
7 changes: 5 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ ThisBuild / organization := "app.softnetwork"

name := "softclient4es"

ThisBuild / version := "0.17.1"
ThisBuild / version := "0.17.2"

ThisBuild / scalaVersion := scala213

Expand Down Expand Up @@ -222,7 +222,10 @@ def testkitProject(esVersion: String, ss: Def.SettingsDefinition*): Project = {
// "org.apache.logging.log4j" % "log4j-slf4j-impl" % Versions.log4j,
"org.apache.logging.log4j" % "log4j-core" % Versions.log4j,
"app.softnetwork.persistence" %% "persistence-core-testkit" % Versions.genericPersistence,
"org.testcontainers" % "testcontainers-elasticsearch" % Versions.testContainers excludeAll (jacksonExclusions: _*)
"org.testcontainers" % "testcontainers-elasticsearch" % Versions.testContainers excludeAll (jacksonExclusions: _*),
"org.testcontainers" % "testcontainers-minio" % Versions.testContainers,
// Required at test runtime for COPY INTO ... FROM 's3a://...' tests via MinioTestKit
// "org.apache.hadoop" % "hadoop-aws" % Versions.hadoop % Test excludeAll (excludeSlf4jAndLog4j: _*)
),
Compile / compile := (Compile / compile).dependsOn(copyTestkit(esVersion)).value
)
Expand Down
17 changes: 15 additions & 2 deletions core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,20 @@ val mockito = Seq(
val avro = Seq(
"org.apache.parquet" % "parquet-avro" % "1.15.2" excludeAll (excludeSlf4jAndLog4j *),
"org.apache.avro" % "avro" % "1.11.4" excludeAll (excludeSlf4jAndLog4j *),
"org.apache.hadoop" % "hadoop-client" % "3.4.2" excludeAll (excludeSlf4jAndLog4j *)
"org.apache.hadoop" % "hadoop-client" % Versions.hadoop excludeAll (excludeSlf4jAndLog4j *)
)

// Cloud storage connectors — provided: must be added to the classpath at runtime / assembly.
// Each connector activates the corresponding URI scheme in HadoopConfigurationFactory:
// s3a:// → hadoop-aws (+ AWS credentials via AWS_* env vars)
// abfs:// → hadoop-azure (+ Azure credentials via AZURE_* env vars)
// gs:// → gcs-connector (+ GCP credentials via GOOGLE_APPLICATION_CREDENTIALS)
val cloudConnectors = Seq(
"org.apache.hadoop" % "hadoop-aws" % Versions.hadoop % Provided excludeAll (excludeSlf4jAndLog4j *),
"org.apache.hadoop" % "hadoop-azure" % Versions.hadoop % Provided excludeAll (excludeSlf4jAndLog4j *),
"com.google.cloud.bigdataoss" % "gcs-connector" % Versions.gcsConnector % Provided
exclude ("com.google.guava", "guava")
excludeAll (excludeSlf4jAndLog4j *)
)

val repl = Seq(
Expand All @@ -49,7 +62,7 @@ val repl = Seq(
)

libraryDependencies ++= akka ++ typesafeConfig ++ http ++
json4s ++ mockito ++ avro ++ repl :+ "com.google.code.gson" % "gson" % Versions.gson :+
json4s ++ mockito ++ avro ++ cloudConnectors ++ repl :+ "com.google.code.gson" % "gson" % Versions.gson :+
"com.typesafe.scala-logging" %% "scala-logging" % Versions.scalaLogging :+
"io.delta" %% "delta-standalone" % Versions.delta :+
"org.scalatest" %% "scalatest" % Versions.scalatest % Test
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,7 @@ trait IndicesApi extends ElasticClientHelpers {
suffixDateKey = suffixKey,
suffixDatePattern = suffixPattern,
update = Some(doUpdate),
hadoopConf = hadoopConf
hadoopConf = Some(hadoopConf.getOrElse(file.HadoopConfigurationFactory.forPath(source)))
)(BulkOptions(defaultIndex = target), system)

} yield bulkResult
Expand Down
233 changes: 232 additions & 1 deletion core/src/main/scala/app/softnetwork/elastic/client/file/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ package object file {
checkIsFile: Boolean = true
)(implicit conf: Configuration): Unit = {
val path = new Path(filePath)
val fs = FileSystem.get(conf)
val fs = FileSystem.get(path.toUri, conf)

if (!fs.exists(path)) {
throw new IllegalArgumentException(s"File does not exist: $filePath")
Expand Down Expand Up @@ -1144,6 +1144,237 @@ package object file {
}
}

/** Builds a Hadoop [[Configuration]] suited to the URI scheme of a given path.
*
* Supported schemes:
* - `s3a://`, `s3://` → AWS S3 via S3AFileSystem (reads `AWS_*` env vars)
* - `abfs://`, `abfss://` → Azure ADLS Gen2 (reads `AZURE_*` env vars)
* - `wasb://`, `wasbs://` → Azure Blob Storage (reads `AZURE_*` env vars)
* - `gs://` → Google Cloud Storage (reads `GOOGLE_*` env vars)
* - `hdfs://` → HDFS (auto-loads `HADOOP_CONF_DIR` XML files)
* - anything else / no scheme → local filesystem
*
* Additionally, any `*.xml` files found in `~/.softclient4es/` are loaded on top of the
* auto-detected configuration, allowing per-user overrides.
*
* Cloud connector JARs (`hadoop-aws`, `hadoop-azure`, `gcs-connector`) must be present in the
* classpath at runtime (declared as `provided` dependencies in the build).
*/
object HadoopConfigurationFactory {

private val factoryLogger: Logger = LoggerFactory.getLogger("HadoopConfigurationFactory")

/** Returns the value of env var `name`, falling back to the JVM system property of the same
* name. This allows test harnesses to inject credentials via `System.setProperty(...)` when
* environment variables cannot be mutated at runtime.
*/
private def envOrProp(name: String): Option[String] =
sys.env.get(name).orElse(Option(System.getProperty(name)))

/** Returns a [[Configuration]] appropriate for the URI scheme embedded in `path`. */
def forPath(path: String): Configuration = {
val scheme = Try(new java.net.URI(path).getScheme).getOrElse(null)
val conf = scheme match {
case "s3a" | "s3" => s3aConf()
case "abfs" | "abfss" | "wasb" | "wasbs" => azureConf()
case "gs" => gcsConf()
case "hdfs" => hdfsConf()
case _ => localConf()
}
loadUserXmlConf(conf)
conf
}

// -------------------------------------------------------------------------
// Private builders
// -------------------------------------------------------------------------

private def base(): Configuration = {
val conf = new Configuration()
conf.setBoolean("parquet.avro.readInt96AsFixed", true)
conf.setInt("io.file.buffer.size", 65536)
conf.setBoolean("fs.automatic.close", true)
conf
}

private def localConf(): Configuration = {
val conf = base()
conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem")
conf
}

/** AWS S3 via S3AFileSystem.
*
* Reads (in priority order):
* - `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY` / `AWS_SESSION_TOKEN`
* - `AWS_REGION` or `AWS_DEFAULT_REGION`
* - `AWS_ENDPOINT_URL` for S3-compatible stores (MinIO, LocalStack, …)
*
* Falls back to `DefaultAWSCredentialsProviderChain` (IAM roles, `~/.aws/credentials`, …) when
* no explicit credentials are found.
*/
private def s3aConf(): Configuration = {
val conf = base()
conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
conf.set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A")

val hasStaticCreds = envOrProp("AWS_ACCESS_KEY_ID").isDefined
if (hasStaticCreds) {
envOrProp("AWS_ACCESS_KEY_ID").foreach(conf.set("fs.s3a.access.key", _))
envOrProp("AWS_SECRET_ACCESS_KEY").foreach(conf.set("fs.s3a.secret.key", _))
envOrProp("AWS_SESSION_TOKEN").foreach(conf.set("fs.s3a.session.token", _))
} else {
conf.set(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.DefaultAWSCredentialsProviderChain"
)
}

envOrProp("AWS_REGION")
.orElse(envOrProp("AWS_DEFAULT_REGION"))
.foreach(conf.set("fs.s3a.endpoint.region", _))

envOrProp("AWS_ENDPOINT_URL").foreach { url =>
conf.set("fs.s3a.endpoint", url)
conf.setBoolean("fs.s3a.path.style.access", true)
}

factoryLogger.info("Configured S3A filesystem")
conf
}

/** Azure ADLS Gen2 (`abfs`/`abfss`) and Azure Blob Storage (`wasb`/`wasbs`).
*
* Authentication is resolved in this order:
* 1. Shared-key: `AZURE_STORAGE_ACCOUNT_NAME` + `AZURE_STORAGE_ACCOUNT_KEY` 2. OAuth2
* service principal: `AZURE_CLIENT_ID` + `AZURE_CLIENT_SECRET` + `AZURE_TENANT_ID` 3. SAS
* token: `AZURE_STORAGE_ACCOUNT_NAME` + `AZURE_STORAGE_SAS_TOKEN`
*/
private def azureConf(): Configuration = {
val conf = base()
conf.set("fs.abfs.impl", "org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem")
conf.set("fs.abfss.impl", "org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem")
conf.set("fs.wasb.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
conf.set("fs.wasbs.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem$Secure")

val accountName = sys.env.get("AZURE_STORAGE_ACCOUNT_NAME")

// 1. Shared-key
for {
account <- accountName
key <- sys.env.get("AZURE_STORAGE_ACCOUNT_KEY")
} {
conf.set(s"fs.azure.account.key.$account.dfs.core.windows.net", key)
conf.set(s"fs.azure.account.key.$account.blob.core.windows.net", key)
}

// 2. OAuth2 service principal
for {
clientId <- sys.env.get("AZURE_CLIENT_ID")
clientSecret <- sys.env.get("AZURE_CLIENT_SECRET")
tenantId <- sys.env.get("AZURE_TENANT_ID")
account <- accountName
} {
conf.set(s"fs.azure.account.auth.type.$account.dfs.core.windows.net", "OAuth")
conf.set(
s"fs.azure.account.oauth.provider.type.$account.dfs.core.windows.net",
"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider"
)
conf.set(
s"fs.azure.account.oauth2.client.endpoint.$account.dfs.core.windows.net",
s"https://login.microsoftonline.com/$tenantId/oauth2/token"
)
conf.set(
s"fs.azure.account.oauth2.client.id.$account.dfs.core.windows.net",
clientId
)
conf.set(
s"fs.azure.account.oauth2.client.secret.$account.dfs.core.windows.net",
clientSecret
)
}

// 3. SAS token
for {
account <- accountName
sasToken <- sys.env.get("AZURE_STORAGE_SAS_TOKEN")
} {
conf.set(s"fs.azure.sas.$account.blob.core.windows.net", sasToken)
}

factoryLogger.info("Configured Azure filesystem (ADLS Gen2 / Blob Storage)")
conf
}

/** Google Cloud Storage via the GCS Hadoop connector.
*
* Reads:
* - `GOOGLE_APPLICATION_CREDENTIALS` → path to a service-account JSON key file
* - `GOOGLE_CLOUD_PROJECT` → GCS project id (optional)
*
* Falls back to Application Default Credentials (ADC) when no env var is set.
*/
private def gcsConf(): Configuration = {
val conf = base()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

sys.env.get("GOOGLE_APPLICATION_CREDENTIALS") match {
case Some(keyFile) =>
conf.set("google.cloud.auth.service.account.enable", "true")
conf.set("google.cloud.auth.service.account.json.keyfile", keyFile)
case None =>
// Use Application Default Credentials (Workload Identity, gcloud auth, …)
conf.set("google.cloud.auth.type", "APPLICATION_DEFAULT")
}

sys.env.get("GOOGLE_CLOUD_PROJECT").foreach(conf.set("fs.gs.project.id", _))

factoryLogger.info("Configured GCS filesystem")
conf
}

/** HDFS — the namenode is encoded in the path URI itself (`hdfs://namenode:port/…`).
*
* Also loads `core-site.xml` and `hdfs-site.xml` from `HADOOP_CONF_DIR` when available, and
* propagates `HADOOP_USER_NAME` as a system property.
*/
private def hdfsConf(): Configuration = {
val conf = base()

sys.env.get("HADOOP_USER_NAME").foreach(System.setProperty("HADOOP_USER_NAME", _))

sys.env.get("HADOOP_CONF_DIR").foreach { dir =>
Seq("core-site.xml", "hdfs-site.xml").foreach { xml =>
val f = new java.io.File(dir, xml)
if (f.exists()) {
factoryLogger.info(s"Loading HDFS config: ${f.getAbsolutePath}")
conf.addResource(new Path(f.getAbsolutePath))
}
}
}

factoryLogger.info("Configured HDFS filesystem")
conf
}

/** Loads every `*.xml` file found in `~/.softclient4es/` on top of the given configuration.
* This lets users override or extend any property set by the auto-detection logic.
*/
private def loadUserXmlConf(conf: Configuration): Unit = {
val userConfDir = new java.io.File(System.getProperty("user.home"), ".softclient4es")
if (userConfDir.isDirectory) {
val xmlFiles = userConfDir.listFiles(f => f.isFile && f.getName.endsWith(".xml"))
if (xmlFiles != null) {
xmlFiles.foreach { f =>
factoryLogger.info(s"Loading user Hadoop config override: ${f.getAbsolutePath}")
conf.addResource(new Path(f.getAbsolutePath))
}
}
}
}
}

/** Factory to get the appropriate FileSource based on file format
*/
object FileSourceFactory {
Expand Down
69 changes: 69 additions & 0 deletions documentation/sql/dml_statements.md
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,75 @@ ON CONFLICT (uuid) DO UPDATE;

---

### Remote File System Support

`COPY INTO` transparently supports remote file systems by auto-detecting the URI scheme in the `FROM` path.
No SQL syntax change is required — simply use the appropriate URI scheme.

| URI scheme | File system | Required JAR |
| --- | --- | --- |
| `s3a://` or `s3://` | AWS S3 | `hadoop-aws` |
| `abfs://`, `abfss://`, `wasb://`, `wasbs://` | Azure ADLS Gen2 / Blob Storage | `hadoop-azure` |
| `gs://` | Google Cloud Storage | `gcs-connector-hadoop3` |
| `hdfs://` | HDFS | _(bundled with hadoop-client)_ |
| _(no scheme / local path)_ | Local filesystem | _(no extra JAR needed)_ |

> **Important:** Cloud connector JARs are declared as `provided` dependencies and are **not bundled** in the library.
> They must be present in the runtime classpath (e.g. added to the CLI assembly or the application's fat-jar).

#### Credentials configuration

Authentication is resolved automatically from standard environment variables:

**AWS S3**
```
AWS_ACCESS_KEY_ID # access key (falls back to DefaultAWSCredentialsProviderChain)
AWS_SECRET_ACCESS_KEY # secret key
AWS_SESSION_TOKEN # session token (optional)
AWS_REGION # region (or AWS_DEFAULT_REGION)
AWS_ENDPOINT_URL # custom endpoint for S3-compatible stores (MinIO, LocalStack, …)
```

**Azure ADLS Gen2 / Blob Storage**
```
AZURE_STORAGE_ACCOUNT_NAME # storage account name
AZURE_STORAGE_ACCOUNT_KEY # shared key (Option 1)
AZURE_CLIENT_ID # service principal client ID (Option 2 — OAuth2)
AZURE_CLIENT_SECRET # service principal secret (Option 2 — OAuth2)
AZURE_TENANT_ID # Azure tenant ID (Option 2 — OAuth2)
AZURE_STORAGE_SAS_TOKEN # SAS token (Option 3)
```

**Google Cloud Storage**
```
GOOGLE_APPLICATION_CREDENTIALS # path to service-account JSON key file
GOOGLE_CLOUD_PROJECT # GCS project ID (optional)
```
Falls back to Application Default Credentials (Workload Identity, `gcloud auth`, …) when the variable is absent.

**HDFS**
```
HADOOP_CONF_DIR # directory containing core-site.xml and hdfs-site.xml
HADOOP_USER_NAME # Hadoop user name (optional)
```

#### Per-user Hadoop overrides

Any `*.xml` file placed in `~/.softclient4es/` is loaded on top of the auto-detected configuration.
This allows fine-grained property overrides without changing environment variables.

```xml
<!-- ~/.softclient4es/s3a-override.xml -->
<configuration>
<property>
<name>fs.s3a.connection.maximum</name>
<value>200</value>
</property>
</configuration>
```

---

## DML Lifecycle Example

```sql
Expand Down
Loading