[NGSOK-1703] Pull changes from v4.2.0-rc3#39
Merged
Conversation
…istTableAndViewSummaries ### What changes were proposed in this pull request? Rename the unified listing method on `TableViewCatalog` from `listRelationSummaries` to `listTableAndViewSummaries`. Updates the declaration in `TableViewCatalog.java` and the two existing references introduced by [SPARK-56655](https://issues.apache.org/jira/browse/SPARK-56655): - `sql/catalyst/.../TableViewCatalog.java` -- method declaration + javadoc - `sql/core/.../ShowTablesExec.scala` -- call site + scaladoc reference - `sql/core/.../DataSourceV2MetadataViewSuite.scala` -- comment reference ### Why are the changes needed? The new name explicitly states what the method returns (tables and views) and avoids overloading Spark's existing "Relation" terminology (`BaseRelation`, `LogicalRelation`, etc.), matching the rename rationale that already changed `RelationCatalog` -> `TableViewCatalog` in SPARK-56655. The API is still `Evolving` and unreleased, so this is a pre-release rename with no deprecation cycle needed. ### Does this PR introduce _any_ user-facing change? No end-user behavior change. ### How was this patch tested? Pure rename; existing tests in `DataSourceV2MetadataViewSuite` exercise the renamed method via `SHOW TABLES` on a `TableViewCatalog`. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude (Anthropic), claude-opus-4 Closes apache#55616 from gengliangwang/SPARK-56672. Authored-by: Gengliang Wang <gengliang@apache.org> Signed-off-by: Gengliang Wang <gengliang@apache.org>
…esClient` to `k8s` package ### What changes were proposed in this pull request? This PR promotes the `kubernetesClient` constructor parameter of `KubernetesClusterSchedulerBackend` to a `private[k8s] val`, exposing it as a read-only member to other components in the `org.apache.spark.scheduler.cluster.k8s` package. ### Why are the changes needed? Like `ExecutorResizePlugin`, many plugins currently build its own `KubernetesClient` via `SparkKubernetesClientFactory.createKubernetesClient(...)`. This produces multiple clients in the driver process that target the same API server with the same credentials. The driver already owns a single `KubernetesClient` through `KubernetesClusterSchedulerBackend`, whose lifecycle is managed in `start()` / `stop()`. Exposing it as `private[k8s] val` lets package-local components reuse this shared client in follow-up work, without widening the API surface to other modules or external users. This PR is intentionally limited to the visibility change. Reusing the shared client from existing plugins is left to follow-up PRs so each change stays minimal and reviewable. ### Does this PR introduce _any_ user-facing change? No. The visibility is `private[k8s]`, so the change is invisible outside the `org.apache.spark.scheduler.cluster.k8s` package. ### How was this patch tested? Pass the CIs with a new unit test. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Opus 4.7 Closes apache#55634 from dongjoon-hyun/SPARK-56684. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…columns at refresh time
Co-authored-by: Isaac
### What changes were proposed in this pull request?
This PR adds column ID support to the DSv2 Column interface so that Spark can detect when a column has been
dropped and re-added with the same name during table refresh.
Connectors that support column IDs (e.g., Delta, Iceberg) can now return a unique identifier via `Column.id()`. Spark validates these IDs at refresh time and fails with a clear error if a column has been replaced.
Column IDs are assigned at the top-level column granularity. Nested struct fields, array elements,
and map keys/values do not have separate IDs through this API.
A top-level-only ID will not detect same-name same-type drop+re-add of a nested field. This
matches the current behavior of connectors like Delta and Iceberg, where the top-level column ID
is unchanged when a nested field is dropped and re-added.
Connectors that want to detect nested drop+re-add can do so by encoding their nested field IDs
into the top-level `Column.id()` string. For example, a column `person STRUCT<name, age>` with
root ID 5 and nested IDs 10, 11 could return `"5[10,11]"` — if `age` is dropped and re-added
with new nested ID 12, the string becomes `"5[10,12]"`, and Spark detects the mismatch.
Changes to nested fields that alter the parent's data type (e.g., dropping `person.age` changes
the struct from `STRUCT<name, age>` to `STRUCT<name>`) are caught by schema validation regardless
of ID support.
The default `InMemoryTableCatalog` preserves column IDs across type changes, matching the behavior
of real connectors like Delta and Iceberg. A new `ComposedColumnIdTableCatalog` demonstrates the
recommended adoption pattern for connectors with nested IDs: encoding the full subtree into the
top-level `Column.id()` string so that any nested change is detected.
### Why are the changes needed?
Spark's existing refresh validation only checks column names and types, so it cannot detect when a column has been replaced with a same-named column. This PR exposes column IDs through the DSv2 API so Spark can catch these scenarios at refresh time.
### Does this PR introduce _any_ user-facing change?
No. Column IDs are a new opt-in API for DSv2 connectors. Existing connectors are unaffected because `Column.id()` defaults to `null`, and null IDs are skipped during validation. Only connectors that explicitly implement `Column.id()` will see the new `COLUMN_ID_MISMATCH` error when a column is dropped and re-added between analysis and execution.
### How was this patch tested?
New tests added in `DataSourceV2DataFrameSuite` and `DataSourceV2ExtSessionColumnIdSuite`.
#### Testing concept
Real connectors (Delta, Iceberg, etc.) differ in how much identity tracking they support: some assign unique IDs to every column, some only to certain columns, some support table-level IDs but not column IDs, and some support none at all. The test infrastructure needs to cover this full connector support matrix so we can verify that column ID validation works correctly when the connector opts in, and is safely skipped when it does not.
To achieve this, we introduce custom test catalogs that each simulate a different real-world connector behavior. They all extend `InMemoryTableCatalog` and share the same `InMemoryBaseTable` base which auto-assigns unique column IDs via a global counter. Each catalog then selectively overrides or strips certain identity fields to model a specific connector scenario.
#### Test catalog hierarchy
```
InMemoryTableCatalog (base: full table ID + full column IDs)
|
+-- NullTableIdInMemoryTableCatalog
| Table ID = null, column IDs = normal
| Purpose: verify column IDs alone detect drop+recreate without table-level identity
|
| +-- SharedInMemoryTableCatalog
| Tables stored in a static map shared across SparkSessions
| Purpose: verify cross-session detection (session1 holds stale DF, session2 mutates)
|
+-- NullColumnIdInMemoryTableCatalog
| Table ID = normal, column IDs = ALL null
| Purpose: verify validation is safely skipped when connector does not support column IDs
|
+-- MixedColumnIdTableCatalog
| Table ID = normal, column IDs = selectively null per column name
| Purpose: verify that columns with null IDs are skipped without incorrectly failing
|
+-- TypeChangeResetsColIdTableCatalog
| Table ID = normal, column IDs reset to fresh values on type changes
| Purpose: verify column ID mismatch fires when type changes produce new IDs
|
+-- ComposedColumnIdTableCatalog
Table ID = normal, nested field IDs encoded into top-level Column.id() string
Purpose: verify nested drop+re-add and deep nesting detected via subtree-ID composition
```
#### What is tested
| Catalog | What it tests |
|---------|--------------|
| `testcat` (`InMemoryTableCatalog`) | Full column ID support, IDs preserved across type changes (matching Delta/Iceberg). Bulk of the tests: drop+re-add (same/different type/case, multiple cols, complex types), access patterns (joins, filters, aggregates, sorts, subqueries, temp views, writeTo, insertInto), layered defense (type widening and nested type change caught by schema validation when ID is preserved), nested field ID preservation (parent and sibling IDs unchanged after nested drop or add), and no-false-positive checks (column additions, data-only inserts, nested field additions tolerated) |
| `nullidcat` (`NullTableIdInMemoryTableCatalog`) | Column IDs alone detect drop+recreate table scenario without table-level identity |
| `nullcolidcat` (`NullColumnIdInMemoryTableCatalog`) | Validation is safely skipped when all column IDs are null |
| `mixedcolidcat` (`MixedColumnIdTableCatalog`) | Columns with null IDs are skipped; columns with non-null IDs are still validated |
| `resetidcat` (`TypeChangeResetsColIdTableCatalog`) | Column ID mismatch fires when type changes produce fresh IDs (unlike the default which preserves IDs) |
| `composedidcat` (`ComposedColumnIdTableCatalog`) | Nested drop+re-add detected via subtree-ID composition (structs, arrays, maps), depth-3 nesting, drop+re-add with same name changes composed ID, data insertion tolerance |
| `sharedcat` (`SharedInMemoryTableCatalog`) | Cross-session: session2 mutates table, session1's stale DataFrame detects it at refresh |
### Was this patch authored or co-authored using generative AI tooling?
Yes
Closes apache#55376 from longvu-db/dsv2-column-id.
Authored-by: Thang Long Vu <long.vu@databricks.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
### What changes were proposed in this pull request? This PR wires frozen SQL PATH semantics into analysis for persisted views and SQL functions (scalar and table). Specifically: - `AnalysisContext.withAnalysisContext(viewDesc)` now reads `viewStoredResolutionPath` and seeds `resolutionPathEntries` from persisted metadata when present. - `AnalysisContext.withAnalysisContext(function)` now reads `functionStoredResolutionPath` and seeds `resolutionPathEntries` for SQL function body analysis. - `CatalogManager.deserializePathEntries` is added to parse stored JSON path entries into analysis-time path entries. - Relation-resolution comments/docs are updated to reflect that persisted frozen path is now applied. - Regression tests are added: - `SQLViewSuite`: persisted view keeps creation-time PATH semantics. - `SQLFunctionSuite`: persisted SQL scalar function keeps creation-time PATH semantics. - `SQLFunctionSuite`: persisted SQL table function keeps creation-time PATH semantics. ### Why are the changes needed? Without this wiring, persisted views/functions may resolve unqualified names using the caller's current PATH instead of the PATH captured at creation time. That can cause behavior drift after `SET PATH` changes. This PR makes persisted object resolution stable and deterministic. ### Does this PR introduce _any_ user-facing change? Yes. For persisted views and SQL functions (including SQL table functions) created with PATH enabled, unqualified name resolution now consistently follows the stored creation-time PATH, even if the session PATH changes later. ### How was this patch tested? Added/updated unit tests and ran focused suites locally: - `build/sbt 'sql/testOnly org.apache.spark.sql.execution.SimpleSQLViewSuite'` - `build/sbt 'sql/testOnly org.apache.spark.sql.execution.SQLFunctionSuite'` Both suites passed with the new regression tests. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Cursor Codex 5.3 Closes apache#55569 from srielau/SPARK-56639-frozen-path-semantics. Authored-by: Serge Rielau <serge@rielau.com> Signed-off-by: Allison Wang <allison.wang@databricks.com>
…sClusterSchedulerBackend.kubernetesClient` ### What changes were proposed in this pull request? This PR improves `ExecutorResizeDriverPlugin` to reuse the `KubernetesClient` owned by `KubernetesClusterSchedulerBackend` instead of creating its own client via `SparkKubernetesClientFactory.createKubernetesClient(...)`. ### Why are the changes needed? Before this PR, the driver process held two `KubernetesClient` instances pointing at the same API server with the same credentials: one owned by `KubernetesClusterSchedulerBackend` and one created by `ExecutorResizeDriverPlugin`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs with the updated unit tests. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Opus 4.7 Closes apache#55639 from dongjoon-hyun/SPARK-56689. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
… operations ### What changes were proposed in this pull request? This PR implements group filtering for WriteDelta row level operations. It re-applies apache#55612 (commit `5ef2e1ba174`, reverted in `8e8fee2692f`) and resolves the test failures reported in apache#55612 (comment) by updating the scan-count assertions in the transactional check tests in `MergeIntoTableSuiteBase` and `UpdateTableSuiteBase`. With group filtering, `matchingRowsPlan` re-scans the target, and for MERGE `RewritePredicateSubquery` also re-scans the source. For MERGE the delta scan counts now match the non-delta values, so the `deltaMerge` conditionals collapse. For UPDATE the delta counts double but remain under the non-delta values because `ReplaceData` still adds further scans. ### Why are the changes needed? These changes are needed to close the gap in WriteDelta plans. ### Does this PR introduce _any_ user-facing change? Changes are backward compatible. ### How was this patch tested? This PR comes with tests. Locally verified all 9 affected suites are green (517 tests): ``` build/sbt 'sql/testOnly \ org.apache.spark.sql.connector.DeltaBasedMergeIntoTableSuite \ org.apache.spark.sql.connector.DeltaBasedMergeIntoTableWithDeletionVectorsSuite \ org.apache.spark.sql.connector.DeltaBasedMergeIntoTableUpdateAsDeleteAndInsertSuite \ org.apache.spark.sql.connector.DeltaBasedUpdateTableSuite \ org.apache.spark.sql.connector.DeltaBasedUpdateTableWithDeletionVectorsSuite \ org.apache.spark.sql.connector.DeltaBasedUpdateAsDeleteAndInsertTableSuite \ org.apache.spark.sql.connector.DeltaBasedNoMetadataDeleteFromTableSuite \ org.apache.spark.sql.connector.GroupBasedMergeIntoTableSuite \ org.apache.spark.sql.connector.GroupBasedUpdateTableSuite' ``` ### Was this patch authored or co-authored using generative AI tooling? Claude Code v2.1.123. Closes apache#55635 from gengliangwang/spark-56669-redo. Lead-authored-by: Anton Okolnychyi <aokolnychyi@apache.org> Co-authored-by: Anton Okolnychyi <aokolnychyi@apache.org> Signed-off-by: Gengliang Wang <gengliang@apache.org>
### What changes were proposed in this pull request?
This PR aims to support a new `ExecutorPVCResizePlugin` that monitors executor PVC disk usage
and grows each PVC's `spec.resources.requests.storage` when usage exceeds a
threshold.
The executor side reports the max filesystem usage ratio across
`DiskBlockManager.localDirs`. The driver side patches the executor pod's PVCs to
`currentSize * (1 + factor)` when the reported ratio exceeds the threshold.
New configurations:
| Key | Default | Meaning |
|---|---|---|
| `spark.kubernetes.executor.pvc.resizeInterval` | `0min` | Resize check interval. `0` disables. |
| `spark.kubernetes.executor.pvc.resizeThreshold` | `0.5` | Usage ratio above which a resize is triggered. |
| `spark.kubernetes.executor.pvc.resizeFactor` | `1.0` | Growth factor. |
Note that the public cloud PVC has clear limitations. For example, you can increase only and **once per every six hour**. So, for a short Spark job (whose lifetime is less than 6 hours), `ExecutorPVCResizePlugin` is able to increase only one time. For a recently increased PVCs, K8s will show a warning something like the following.
```
Warning VolumeResizeFailed 6s (x8 over 35s) external-resizer ebs.csi.aws.com
resize volume "pvc-baccb8c8-16e7-4b31-b254-dd223a3e0be3" by resizer "ebs.csi.aws.com" failed: rpc error:
code = Internal desc = Could not resize volume "vol-0cb3afad24b360ebd": rpc error: code = Internal desc = Could not modify vol
ume "vol-0cb3afad24b360ebd": volume "vol-0cb3afad24b360ebd" in OPTIMIZING state, cannot currently modify
```
### Why are the changes needed?
PVC-backed `SPARK_LOCAL_DIRS` must be sized conservatively up front to avoid
mid-job disk-full failures, which wastes storage cost. `ExecutorResizePlugin`
already established the observe-and-patch pattern for memory; this extends it to
PVC storage.
### Does this PR introduce _any_ user-facing change?
No. The user needs to set this to `spark.plugins` explicitly.
**SUBMIT**
```
bin/spark-submit \
--master k8s://$K8S_MASTER \
--deploy-mode cluster \
-c spark.executor.cores=4 \
-c spark.executor.memory=4g \
-c spark.kubernetes.container.image=docker.apple.com/d_hyun/spark:20260430 \
-c spark.kubernetes.authenticate.driver.serviceAccountName=spark \
-c spark.kubernetes.driver.pod.name=pi \
-c spark.kubernetes.executor.podNamePrefix=pi \
-c spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/data \
-c spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly=false \
-c spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName=OnDemand \
-c spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit=50Gi \
-c spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass=gp3 \
-c spark.kubernetes.driver.podTemplateFile=eks-root-pod.yml \
-c spark.kubernetes.executor.podTemplateFile=eks-root-pod.yml \
-c spark.plugins=org.apache.spark.scheduler.cluster.k8s.ExecutorPVCResizePlugin \
-c spark.kubernetes.executor.pvc.resizeInterval=1m \
--class org.apache.spark.examples.SparkPi \
local:///opt/spark/examples/jars/spark-examples.jar 400000
```
**DRIVER LOGS**
```
$ kubectl logs -f pi | grep Plugin
26/05/01 01:39:38 INFO ExecutorPVCResizeDriverPlugin: ExecutorPVCResizeDriverPlugin is scheduled
26/05/01 01:39:38 INFO DriverPluginContainer: Initialized driver component for plugin org.apache.spark.scheduler.cluster.k8s.ExecutorPVCResizePlugin.
26/05/01 01:40:38 INFO ExecutorPVCResizeDriverPlugin: Latest PVC usage reports: {}
26/05/01 01:41:38 INFO ExecutorPVCResizeDriverPlugin: Latest PVC usage reports: {1=PVCDiskUsageReport(1,0.6136656796630462), 2=PVCDiskUsageReport(2,3.507855787665699E-4)}
26/05/01 01:41:38 INFO ExecutorPVCResizeDriverPlugin: Try to resize executor 1 PVC pi-exec-1-pvc-0 with ratio 0.6136656796630462 (threshold 0.5).
26/05/01 01:41:38 INFO ExecutorPVCResizeDriverPlugin: Increase PVC pi-exec-1-pvc-0 storage from 53687091200 to 107374182400 as usage ratio exceeded threshold.
26/05/01 01:41:38 INFO ExecutorPVCResizeDriverPlugin: Try to resize executor 2 PVC pi-exec-2-pvc-0 with ratio 3.507855787665699E-4 (threshold 0.5).
```
**EXECUTOR SIZE REPORTING (60% -> 30%)**
```
$ kubectl logs -f pi-exec-1 | grep Plugin
26/05/01 01:22:54 INFO ExecutorPVCResizeExecutorPlugin: Reporting max PVC disk usage ratio for executor 1: 0.6136656796630462
26/05/01 01:23:54 INFO ExecutorPVCResizeExecutorPlugin: Reporting max PVC disk usage ratio for executor 1: 0.30591566408202353
```
**RESIZED PVC**
```
$ kubectl get pvc
NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS VOLUMEATTRIBUTESCLASS AGE
pi-exec-1-pvc-0 Bound pvc-d279a3da-ddfb-41c2-a32b-0f2bd83941c4 107374182400 RWOP gp3 <unset> 2m28s
pi-exec-2-pvc-0 Bound pvc-79f092d3-4a8d-4981-946d-d745d4038fd6 50Gi RWOP gp3 <unset> 2m28s
```
### How was this patch tested?
Pass the CIs with a new `ExecutorPVCResizePluginSuite`.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.7 (1M context)
Closes apache#55642 from dongjoon-hyun/SPARK-56693.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request? This PR adds a new scalar SQL function `time_bucket(bucket_size, ts[, origin])` that aligns a timestamp to the start of a fixed-size interval bucket. Given a bucket size (day-time or year-month interval), a timestamp, and an optional origin, it returns the start of the half-open bucket `[start, start + bucket_size)` containing the timestamp. Buckets are anchored at `origin` (default `1970-01-01 00:00:00`- in the session time zone for `TIMESTAMP`, in UTC for `TIMESTAMP_NTZ`) and the grid extends infinitely in both directions. For `TIMESTAMP_NTZ`, bucketing is performed in UTC. For `TIMESTAMP`, year-month interval buckets and calendar-day components of day-time interval buckets align to the session time zone, matching `+ INTERVAL '<k*bucket_size>'` semantics across DST transitions. Changes: - New `TimeBucket` expression (extends `TimeZoneAwareExpression`) in `sql/catalyst/.../expressions/datetimeExpressions.scala`, with an `ExpressionBuilder` that dispatches the two- and three-argument forms. - Bucketing helpers `timeBucketDTInterval` / `timeBucketYMInterval` in `DateTimeUtils.scala`, with overflow checks on extreme timestamps and origins. - Registered in `FunctionRegistry`. - Scala API: `functions.time_bucket(...)`. - PySpark API: `pyspark.sql.functions.time_bucket` + Connect variant. ### Why are the changes needed? Aligning timestamps to fixed-size buckets (15 minutes, 1 hour, 1 month, etc.) is a common time-series pattern, but today users must assemble it manually, e.g., via `date_trunc` for calendar-aligned buckets or unix-timestamp arithmetic for fixed-second buckets, neither of which supports arbitrary year-month intervals or a non-default origin. ### Does this PR introduce _any_ user-facing change? Yes, a new function `time_bucket` is available in SQL, Scala, and PySpark. Example: ```sql SELECT time_bucket(INTERVAL '15' MINUTE, TIMESTAMP '2024-01-01 11:27:00'); -- 2024-01-01 11:15:00 SELECT time_bucket( INTERVAL '15' MINUTE, TIMESTAMP '2024-01-01 11:27:00', TIMESTAMP '1970-01-01 00:05:00'); -- 2024-01-01 11:20:00 ``` ### How was this patch tested? - New unit tests in DateExpressionsSuite covering codegen and interpreted paths, DT and YM intervals, TIMESTAMP / TIMESTAMP_NTZ, NULL propagation, negative/zero bucket-size validation, ExpressionBuilder, session-zone behavior across DST including the fall-back fold. - New unit tests in DateTimeUtilsSuite for timeBucketDTInterval / timeBucketYMInterval: boundary values, pre-epoch and negative timestamps, extreme-origin overflow, compound DT buckets crossing DST, YM bucket alignment across the four DST-fold edge cases. - New SQL golden file sql-tests/inputs/time-bucket.sql covering DT + YM intervals, TIMESTAMP + TIMESTAMP_NTZ, explicit origins, non-UTC session bucketing across DST (America/Los_Angeles), NULL propagation, and invalid inputs (non-foldable, wrong types, non-positive). - PySpark doctest. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code Closes apache#55535 from vranes/time-bucket. Authored-by: Nikolina Vraneš <nikolina.vranes@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? Added support for connectors to provide custom metrics in Truncate and Delete operations. Refactored the existing custom metrics logic to be reusable across different plan nodes. ### Why are the changes needed? For connectors to provide metric values. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Opus 4.7 Closes apache#55511 from ZiyaZa/custom-metrics. Authored-by: Ziya Mukhtarov <ziya5muxtarov@gmail.com> Signed-off-by: Anton Okolnychyi <aokolnychyi@apache.org>
…CRIBE TABLE EXTENDED for v2 tables and views
### What changes were proposed in this pull request?
Standardize the `# Detailed Table Information` / `# Detailed View
Information` block in `DESCRIBE TABLE EXTENDED` output for v2 tables
and views to emit structured rows derived from the resolved
identifier:
- For tables (`DescribeTableExec`): the single `Name` row that came
from `Table.name()` is replaced by `Catalog`, `Namespace`,
`Database`, and `Table`.
- For views (`DescribeV2ViewExec`): the `Catalog` + `Identifier`
pair (where `Identifier` was a single string concatenating
namespace and name with `.`) is replaced by `Catalog`,
`Namespace`, `Database`, and `View`.
The catalog name and resolved `Identifier` are threaded from
`ResolvedTable` / `ResolvedPersistentView` through the v2 execs.
`DescribeTablePartitionExec` is updated to pass the catalog name to
the inner `DescribeTableExec` it constructs for the schema/partition
header.
The `Namespace` row uses `Identifier.namespace().quoted` —
dot-separated, with back-tick quoting only on segments that need it
— matching the existing Spark convention for multi-segment
namespaces. This keeps the row round-trip-safe for namespaces with
dots in segments while staying readable for the common single-level
case.
#### `Database` row for v1 compatibility
v1 `DescribeTableCommand` (via `CatalogTable.toJsonLinkedHashMap`)
emits `Catalog` / `Database` / `Table` rows, where `Database` is
the single-string `database` field of `TableIdentifier`. To keep
DESCRIBE consumers that read the `Database` row working uniformly
across v1 (HMS) and v2, this PR also emits a `Database` row in the
v2 output. The row is **always present**:
- For a single-segment namespace, `Database` is that single
segment (matches v1 exactly).
- For a multi-segment namespace, `Database` is the trailing
segment — multi-segment namespaces still surface their leaf
segment under the v1-compat row, while consumers that need the
full namespace read `Namespace`.
- For a root-level entity (empty namespace), `Database` is the
empty string. The row is still emitted so the layout is uniform
across all v2 namespaces.
`Database` alone is not round-trip-safe for multi-segment cases;
`Namespace` is the canonical v2 representation.
### Why are the changes needed?
In a multi-catalog deployment, the catalog name is a first-class
part of a v2 table or view identifier. The previous output buried
it inside connector-controlled strings:
- `Table.name()` for tables is connector-defined; some connectors
return `catalog.namespace.name`, others just `namespace.name`,
others use a custom format. The result is that `DESCRIBE TABLE`
output looks different across catalogs even for the same logical
table shape.
- `Identifier` for v2 views collapsed namespace and name into a
single dotted string, so consumers had to parse the dot back out
and could not unambiguously round-trip multi-level namespaces
with dots in segments.
Splitting the components into `Catalog`, `Namespace`,
`Database`, and `Table` / `View` rows:
- gives `DESCRIBE TABLE EXTENDED` a uniform shape across v2
connectors,
- makes the catalog name explicit and surfaceable when multiple v2
catalogs are configured,
- handles multi-level namespaces naturally via
`Identifier.namespace().quoted`,
- aligns the table and view sections so consumers can read the same
rows from either, switching only on the section header
(`# Detailed Table Information` vs `# Detailed View Information`),
- with the always-emitted `Database` compatibility row, lets
consumers built for v1 (HMS) keep working without changes,
- is parseable programmatically without splitting strings.
### Does this PR introduce any user-facing change?
Yes, slight output change in `DESCRIBE TABLE EXTENDED` for v2 tables
and v2 views.
For v2 tables, single-segment namespace (most common):
- Before: `Name | testcat.ns.t | `
- After: `Catalog | testcat | `, `Namespace | ns | `,
`Database | ns | `, `Table | t | `.
For v2 tables, multi-segment namespace:
- Before: `Name | testcat.ns1.ns2.t | `
- After: `Catalog | testcat | `, `Namespace | ns1.ns2 | `,
`Database | ns2 | `, `Table | t | `.
For v2 views, single-segment namespace:
- Before: `Catalog | testcat | `, `Identifier | ns.v | `
- After: `Catalog | testcat | `, `Namespace | ns | `,
`Database | ns | `, `View | v | `.
For v2 views, multi-segment namespace:
- Before: `Catalog | testcat | `, `Identifier | ns1.ns2.v | `
- After: `Catalog | testcat | `, `Namespace | ns1.ns2 | `,
`Database | ns2 | `, `View | v | `.
v1 paths (session-catalog tables and views via HMS) are unchanged.
Tools that read DESCRIBE output should switch from concatenating
`Name` / `Identifier` to reading the structured rows.
### How was this patch tested?
- Updated the affected golden assertion in `DescribeTableSuite`
(`DESCRIBE TABLE EXTENDED of a partitioned table`) to match the
new row layout including the `Database` compatibility row.
- Added focused tests in v2 `DescribeTableSuite` pinning the
structured rows on a freshly created v2 table for both
single-segment (`ns`) and multi-segment (`ns1.ns2`) namespaces —
the multi-segment test pins that `Database` carries the trailing
segment while `Namespace` carries the full dot-joined form.
- Added parallel tests in v2 `DescribeViewSuite` pinning the same
layout for v2 views (single-segment and multi-segment).
- Removed the now-redundant `DESCRIBE TABLE EXTENDED on a non-view
MetadataTable shows the real identifier` test in
`DataSourceV2MetadataTableSuite` (the structured-row layout is
what's pinned by the new tests in `v2.DescribeTableSuite`; the
identifier-passthrough behavior is no longer tied to
`MetadataTable.name()`).
Ran:
build/sbt 'sql/testOnly \
org.apache.spark.sql.execution.command.v2.DescribeTableSuite \
org.apache.spark.sql.execution.command.v2.DescribeViewSuite \
org.apache.spark.sql.connector.DataSourceV2MetadataTableSuite \
org.apache.spark.sql.connector.DataSourceV2MetadataViewSuite'
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.7
Closes apache#55625 from cloud-fan/describe-table-view-structured-rows.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…n` in 5-minute units ### What changes were proposed in this pull request? This PR changes `spark.kubernetes.executor.pvc.resizeInterval` to default to `5min` and to accept only `0` or a positive multiple of 5 minutes. ### Why are the changes needed? Given that PVC expansion is a heavy operation (which typically takes over 1 minutes), restricting to 5-minute units (and defaulting to 5 minutes) gives a sensible out-of-the-box configuration once the plugin is registered, and prevents misconfigured intervals that would only add load on the K8s API server. ### Does this PR introduce _any_ user-facing change? No (unreleased config). ### How was this patch tested? Pass the CIs with the newly added test case. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (claude-opus-4-7) Closes apache#55649 from dongjoon-hyun/SPARK-56699. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…`buildKeys` ### What changes were proposed in this pull request? `DynamicPruningSubquery.canonicalized` now normalizes `buildKeys` relative to `buildQuery.output` using `QueryPlan.normalizeExpressions` instead of calling `.canonicalized` on each key expression independently. ### Why are the changes needed? The previous implementation called `buildKeys.map(_.canonicalized)`, which canonicalized each key expression in isolation and therefore preserved the original `ExprId` values of attribute references. When two `DynamicPruningSubquery` instances referenced the same logical build query (e.g. different copies of a CTE branch) but with different `ExprId`s, their canonical `buildKeys` differed even though the queries were semantically identical. `QueryPlan.normalizeExpressions(key, buildQuery.output)` replaces each attribute reference with `ExprId(ordinal)` where `ordinal` is the attribute's position in `buildQuery.output`. Two copies of the same CTE branch will place the same attribute at the same ordinal, so the canonical `buildKeys` become identical regardless of the original `ExprId` values. ### Does this PR introduce _any_ user-facing change? No. This is an internal canonicalization fix. It may improve query plans by enabling `PlanMerger` to deduplicate more `DynamicPruningSubquery` expressions, but does not change observable query results. ### How was this patch tested? Added a unit test in `DynamicPruningSubquerySuite` that constructs two `DynamicPruningSubquery` instances with identical build query structure but fresh (distinct) `ExprId`s, and asserts that their `canonicalized` forms are equal. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Sonnet 4.6 Closes apache#55644 from peter-toth/SPARK-56694-fix-dynamicpruningsubquery-canonicalization. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…s allocator ### What changes were proposed in this pull request? This PR restricts `ExecutorPVCResizePlugin` to the `direct` pods allocator. When `spark.kubernetes.allocation.pods.allocator` is not `direct`, `init()` logs a warning and returns early. ### Why are the changes needed? `ExecutorPVCResizePlugin` patches individual executor PVCs directly. This only works when Spark owns each pod and its PVCs (`direct` allocator). For `statefulset` and `deployment` allocators, PVCs are governed by the controller's `volumeClaimTemplates`, so direct patches conflict with the controller. This mirrors SPARK-56670, which restricted the sibling `ExecutorResizePlugin` for the same reason. - apache#55615 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs with new unit tests. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (Opus 4.7) Closes apache#55652 from dongjoon-hyun/SPARK-56702. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…n `PlanMerger`
### What changes were proposed in this pull request?
`PlanMerger` now supports filter propagation through `Join` nodes when merging similar subplans. Previously, when two subplans contained identical `Join` nodes but differed only in a filter applied to one of the join's children, they could not be merged.
This PR adds the ability to propagate such filter conditions through a `Join` and into the parent `Aggregate`'s `FILTER` clause. A new `filterSafeForJoin` helper checks that the filter originates from the non-nullable (preserved) side of the join: the left side of `LeftOuter`/`LeftSemi`/`LeftAnti`, the right side of `RightOuter`, or either side of `Inner`/`Cross`. `FullOuter` joins are not eligible.
The feature is gated by a new SQL config `spark.sql.optimizer.mergeSubplans.filterPropagation.throughJoin.enabled` (default: `false`).
### Why are the changes needed?
Without this change, scalar subqueries that differ only in a filter on one side of an identical join cannot be merged, resulting in redundant scans and compute. For example:
SELECT
(SELECT sum(key) FROM t1 JOIN t2 ON t1.id = t2.id),
(SELECT sum(key) FROM t1 JOIN t2 ON t1.id = t2.id WHERE t2.b > 1)
Both subqueries scan `t1` and `t2` in full even though they share the same base join. After this change a single merged scan is used and the second subquery's result is derived from it via an aggregate `FILTER` clause.
### Does this PR introduce _any_ user-facing change?
Yes. When `spark.sql.optimizer.mergeSubplans.filterPropagation.filterPropagationThroughJoin.enabled` is set to `true`, the optimizer may merge scalar subqueries that were previously kept separate, reducing the number of scan and join operations.
### How was this patch tested?
Added unit tests in `MergeSubplansSuite`:
- Merge with filter on left inner join child
- Merge with filter on right inner join child
- No merge when both join children have independent filters
- Merge with filter on the preserved side of a `LeftSemi` join
- No merge when filter is on the non-output side of a `LeftSemi` join
- No merge when filter is on the nullable side of an outer join
- No merge when the feature is disabled via config
Added integration test in `PlanMergeSuite` verifying correctness (`checkAnswer`) and plan shape (`SubqueryExec`/`ReusedSubqueryExec` counts) for both the enabled and disabled config cases, with and without AQE.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Sonnet 4.6
Closes apache#55628 from peter-toth/SPARK-56677-filter-propagation-through-join.
Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request? This PR implements row-level CDC post-processing (carry-over removal and update detection) for DSv2 streaming reads. Previously, streaming `changes()` rejected any post-processing with a blanket `INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED` error. The batch path (added in apache#55508 and apache#55583) uses a Catalyst `Window` keyed by `(rowId, _commit_version)`, which `UnsupportedOperationChecker` rejects on streaming queries (`NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING`). The streaming rewrite in `ResolveChangelogTable` now expresses the same logic with streaming-allowed primitives: ``` EventTimeWatermark(_commit_timestamp, 0s) -> Aggregate keyed by (rowId..., _commit_version, _commit_timestamp) (count_if delete/insert, [min/max/count rowVersion,] collect_list(struct(*))) -> [Filter on the carry-over predicate] -> Generate(Inline(events)) -> [Project relabeling _change_type for delete+insert pairs] -> Project dropping __spark_cdc_* helpers ``` Including `_commit_timestamp` in the grouping keys is required to satisfy the Append-mode streaming aggregation contract (the watermark attribute must appear among the grouping expressions). By CDC convention all rows in a single commit share `_commit_timestamp`, so this is a no-op semantically relative to the batch `(rowId, _commit_version)` grouping. `deduplicationMode = netChanges` is still rejected -- net change computation partitions by `rowId` alone and reasons over the entire requested range, which is fundamentally cross-batch. The existing error class `INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED` is replaced with the more specific `INVALID_CDC_OPTION.STREAMING_NET_CHANGES_NOT_SUPPORTED`, which now names the offending option and points users at the supported streaming alternatives. Doc updates: - `Changelog.java` clarifies that all rows of a single `_commit_version` must share `_commit_timestamp`, and that streaming reads expect non-decreasing `_commit_timestamp` across micro-batches. - `Changelog.java` notes that `containsIntermediateChanges()` is range-scoped, hence the streaming limitation for `netChanges`. - `DataStreamReader.changes()` Scaladoc lists the `netChanges` streaming limitation. ### Why are the changes needed? Without this PR, any streaming CDC read against a connector that emits CoW carry-over pairs (`containsCarryoverRows = true`) or represents updates as raw delete+insert (`representsUpdateAsDeleteAndInsert = true`) raises an analysis error, forcing users to fall back to batch reads. The batch-only restriction is unnecessary for these passes -- they don't need cross-version state -- and it surprises users since the same options work on batch reads. ### Does this PR introduce _any_ user-facing change? Yes. - Streaming `spark.readStream.changes(...)` now supports `computeUpdates = true` and `deduplicationMode = dropCarryovers`. Previously these threw `INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED`. - The error class `INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED` is renamed to `INVALID_CDC_OPTION.STREAMING_NET_CHANGES_NOT_SUPPORTED` with a more specific message. The new error fires only for `deduplicationMode = netChanges` on streaming reads. - `DataStreamReader.changes()` Scaladoc is updated accordingly. - `Changelog.java` Scaladoc clarifies the `_commit_timestamp` contract for streaming. ### How was this patch tested? 86 tests across 4 CDC suites (all passing): - `ResolveChangelogTableStreamingPostProcessingSuite` (new, 5 tests) -- plan-shape assertions covering carry-over only, update detection only, both fused, and the no-rewrite pass-through cases. Verifies the `EventTimeWatermark` + `Aggregate` + `Generate(Inline)` rewrite shape. - `ChangelogResolutionSuite` -- the two existing streaming throw-tests are flipped to plan-shape assertions; a new test covers the `netChanges` streaming throw. - `ResolveChangelogTablePostProcessingSuite` -- the existing streaming throw test is updated to cover the `netChanges`-only case. - `ChangelogEndToEndSuite` -- three new streaming end-to-end tests using `InMemoryChangelogCatalog`: carry-over removal drops CoW pairs, update detection relabels delete+insert as update, and `netChanges` throws. Also confirmed `UnsupportedOperationsSuite` (216 tests) still passes -- the rewritten plan does not contain `Window` or any other streaming-rejected operator. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (claude-opus-4-7) Closes apache#55636 from gengliangwang/streamingCDC. Authored-by: Gengliang Wang <gengliang@apache.org> Signed-off-by: Gengliang Wang <gengliang@apache.org>
… benchmark coverage ### What changes were proposed in this pull request? Add benchmark coverage for the Parquet vectorized-read decode surface that has none today, plus extend the existing `VectorizedRleValuesReaderBenchmark` to its full public API: - **`ParquetVectorUpdaterBenchmark`** (new) — every `ParquetVectorUpdater` family obtained through `ParquetVectorUpdaterFactory.getUpdater`. Six groups: identity, type-converting, rebase, unsigned, decimal, FixedLenByteArray. - **`VectorizedDeltaReaderBenchmark`** (new) — all three delta decoders (`VectorizedDeltaBinaryPackedReader`, `VectorizedDeltaByteArrayReader`, `VectorizedDeltaLengthByteArrayReader`). Five groups covering bulk read/skip across value distributions and prefix-overlap shapes, plus single-value reads and byte/short/unsigned variants. - **`VectorizedPlainValuesReaderBenchmark`** (new) — every public read/skip method on `VectorizedPlainValuesReader`. Five groups: fixed-size bulk, conversion bulk (unsigned, with-rebase), variable-length, single-value, skip. - **`VectorizedRleValuesReaderBenchmark`** (extended) — three new groups: row-index-filtered reads (with-filter code path), single-value reads, skip paths. ### Why are the changes needed? `ParquetVectorUpdater` and the delta / plain decoders sit on the hot path of every Parquet column read but have no in-repo benchmark coverage. Coverage is intentionally broad — every public read/skip method is included even when it's already memcpy-optimal — so the result files track the long-term performance baseline and future iterative optimization does not have to add benchmark coverage as a precursor. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Pass Github Actions ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code Closes apache#55558 from LuciferYang/parquet-benchmark-coverage. Authored-by: YangJie <yangjie01@baidu.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…connect-only ### What changes were proposed in this pull request? Use `pyspark.sql.DataFrame`, not the classic one, in `mlutils.py`. ### Why are the changes needed? We have connect only CI which does not even have class DataFrame. This util should work with connect DataFrame too. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? `test_pipeline` and `test_parity_pipeline` passed locally. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#55630 from gaogaotiantian/fix-mlutils. Authored-by: Tian Gao <gaogaotiantian@hotmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request? This PR completes the DSv2 CDC streaming post-processing surface by implementing `deduplicationMode = netChanges` for streaming reads. The previous PR (apache#55636 / SPARK-56686) added carry-over removal and update detection for streaming but left netChanges batch-only. The batch path (`ResolveChangelogTable.injectNetChangeComputation`) uses a Catalyst `Window` partitioned by `rowId` and ordered by `(_commit_version, change_type_rank)` to find the first and last events per row identity, then applies the SPIP collapse matrix on `(existedBefore, existsAfter)`. `Window` is rejected on streaming children (`NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING`), and unlike the row-level passes the netChanges aggregate is keyed by `rowId` only -- there's no commit-version + commit-timestamp grouping that would let us reuse the streaming Aggregate pattern. This PR adds a streaming-friendly equivalent by delegating per-row-identity state management to a new `CdcNetChangesStatefulProcessor` driven by `TransformWithState`: - The processor stores the first event ever observed and the most-recent event observed for each row identity in `ValueState[Row]`. - An event-time timer is armed on each batch to the latest `_commit_timestamp` observed for the key. When the global watermark advances past the timer, `handleExpiredTimer` evaluates the SPIP matrix and emits 0, 1, or 2 output rows -- identical semantics to the batch path. - Existing per-key timers are deleted before re-arming so that out-of-order events for an earlier commit can't fire a stale timer between batches and produce duplicate output for the same row identity. The analyzer rule constructs `TransformWithState` directly (no precedent in catalyst for this; the typed-Dataset DSL is the usual entry point). Encoders for the input/output `Row` and the rowId tuple are built via `ExpressionEncoder(StructType)`. Nested rowId paths (e.g. `payload.id`) are handled by aliasing each rowId expression to a top-level `__spark_cdc_rowid_<i>` helper column before the `TransformWithState`, then dropping the helpers in a final `Project` so the user-visible schema matches the connector's declared changelog schema. Plan shape: ``` EventTimeWatermark(_commit_timestamp, 0s) -> Project (alias rowId expressions to flat helper columns) -> TransformWithState (grouping = rowId helpers, EventTime mode, Append) -> SerializeFromObject -> Project (drop rowId helper columns) ``` When carry-over removal / update detection are also requested, the row-level rewrite is applied first; the netChanges `TransformWithState` then sits on top of it and the rule reuses the existing `EventTimeWatermark` rather than stacking another (multi-watermark stacking is rejected unless `STATEFUL_OPERATOR_ALLOW_MULTIPLE` is set). #### Documented limitation Row identities only touched in the latest observed commit are held back until a later commit (with strictly greater `_commit_timestamp`) advances the watermark past them, or the source terminates. End-of-input flushes all timers, so bounded streams produce output equivalent to the corresponding batch read. This matches the steady-state behavior of the row-level streaming rewrite. Also removes the now-obsolete error class `INVALID_CDC_OPTION.STREAMING_NET_CHANGES_NOT_SUPPORTED` introduced in SPARK-56686. ### Why are the changes needed? Without this PR, `deduplicationMode = netChanges` is unavailable on streaming CDC reads, forcing users with intermediate-state connectors (`containsIntermediateChanges = true`) to fall back to batch reads when they want a deduplicated change feed. With SPARK-56686 already shipping carry-over removal and update detection for streaming, netChanges was the only post-processing pass still gated to batch -- this completes the surface. ### Does this PR introduce _any_ user-facing change? Yes. - Streaming `spark.readStream.changes(...)` now supports `deduplicationMode = netChanges`. Previously this threw `INVALID_CDC_OPTION.STREAMING_NET_CHANGES_NOT_SUPPORTED`. - That error class is removed; the wording in `DataStreamReader.changes()` and `Changelog.java` Scaladoc has been updated to describe the supported behavior and the latest-commit limitation. Note: the netChanges streaming path uses `TransformWithState`, which requires the RocksDB state store backend (`spark.sql.streaming.stateStore.providerClass = ...RocksDBStateStoreProvider`). Spark surfaces `UNSUPPORTED_FEATURE.STORE_BACKEND_NOT_SUPPORTED_FOR_TWS` if the default HDFS-backed provider is left in place, so this is discoverable. ### How was this patch tested? 89 tests pass across 4 CDC suites (all green): - `ResolveChangelogTableStreamingPostProcessingSuite` -- two new plan-shape tests: `netChanges alone injects watermark + TransformWithState` and `netChanges + carry-over removal share a single watermark` (verifies that the netChanges `TransformWithState` reuses the row-level rewrite's `EventTimeWatermark` rather than stacking another). - `ChangelogResolutionSuite` -- the `netChanges throws` test from SPARK-56686 is flipped to assert that exactly one `TransformWithState` appears in the analyzed plan. - `ResolveChangelogTablePostProcessingSuite` -- the corresponding netChanges throw test is similarly flipped. - `ChangelogEndToEndSuite` -- two new end-to-end tests that drive a streaming query against `InMemoryChangelogCatalog` with the RocksDB state store: `streaming netChanges collapses INSERT then DELETE to no output` (confirms the `(false, false)` cancel case and that end-of-input flushes the latest commit's group) and `streaming netChanges with computeUpdates labels persisting rows as updates` (confirms the `(false, true)` case relabels correctly). Also confirmed `UnsupportedOperationsSuite` (216 tests) still passes. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (claude-opus-4-7) Closes apache#55637 from gengliangwang/streamingCDC-netChanges. Authored-by: Gengliang Wang <gengliang@apache.org> Signed-off-by: Gengliang Wang <gengliang@apache.org>
… metadata ### What changes were proposed in this pull request? Follow-up to apache#55636 addressing post-merge review comments from zikangh: 1. **Deduplicate `isCarryoverPair`.** The carry-over predicate (`_del_cnt = 1 AND _ins_cnt = 1 AND _rv_cnt = 2 AND _min_rv = _max_rv`) was duplicated between the batch path's `addCarryOverPairFilter` and the streaming path's inline filter. Extracted a shared `buildCarryOverPairPredicate` helper and call it from both. 2. **Mark the streaming row-level rewrite via attribute metadata rather than helper column name.** `UnsupportedOperationChecker` previously detected the rewrite by string-matching the `__spark_cdc_events` aggregate alias name. Switched to a metadata marker (`ResolveChangelogTable.streamingPostProcessingMarker`) attached to the alias's output attribute -- mirroring the existing `EventTimeWatermark.delayKey` and `SessionWindow.marker` patterns. The marker travels with the attribute through optimization. 3. **Expand streaming E2E coverage.** New tests in `ChangelogEndToEndSuite`: - composite rowId carry-over removal, - composite rowId update detection (different tuples kept raw), - carry-over + update detection across multiple commits, - DELETE-all-rows and UPDATE-all-rows fixtures, - append-only workload pass-through, - no-op UPDATE labeled as update (rcv differs on pre/post), - large carry-over removal (9 carry-over pairs + 1 real delete). ### Why are the changes needed? zikangh raised these on the merged PR. Bundled together so they can be reviewed and shipped as one follow-up. ### Does this PR introduce _any_ user-facing change? No. Internal refactor (#1, #2) and additional test coverage (#3). The behavior of streaming CDC reads is unchanged. ### How was this patch tested? All 157 tests pass across the four CDC suites: - `ChangelogResolutionSuite` - `ResolveChangelogTablePostProcessingSuite` - `ResolveChangelogTableStreamingPostProcessingSuite` - `ChangelogEndToEndSuite` Also confirmed: - `UnsupportedOperationsSuite` (216 tests) still passes after the marker-based detection switch. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (claude-opus-4-7) Closes apache#55653 from gengliangwang/streamingCDC-followup-zikangh. Authored-by: Gengliang Wang <gengliang@apache.org> Signed-off-by: Gengliang Wang <gengliang@apache.org> (cherry picked from commit 84d9c84) Signed-off-by: Gengliang Wang <gengliang@apache.org>
### What changes were proposed in this pull request? We emptied a CI slot for future usage. After some optimization work, we have a CI slot `pyspark-core, pyspark-errors, pyspark-streaming, pyspark-logger` that normally just takes 30 minutes. It's a waste of slot because we can only have 20 concurrent jobs. This PR splits the workload into other slots * `pyspark-core, pyspark-errors, pyspark-logger` -> `pyspark-sql, pyspark-resource, pyspark-testing` * `pyspark-streaming` -> `pyspark-structured-streaming, pyspark-structured-streaming-connect` * pip test, which used to follow `pyspark-logger` -> follow `pyspark-pipelines` now We should still be able to keep all the CI slots below 90 minutes most of the time (120 is the limit). And we can have a new slot. ### Why are the changes needed? The reason we want a new slot, is to have a CI test against old client (4.0 client for example) to check backward compatibility issues. We have broken the client multiple times this year and we have 0 tests to gate it. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? It's a CI change only. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#55638 from gaogaotiantian/rearrange-pyspark-tests. Authored-by: Tian Gao <gaogaotiantian@hotmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit cff6c0d) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…ee for sbt build ### What changes were proposed in this pull request? This PR excludes old `junit:junit` from the dependency tree for sbt build. This is also another solution for SPARK-56478 (apache#55358). ### Why are the changes needed? Currently, we use `org.junit.jupiter:junit-jupiter` rather than `junit:junit`, and `junit:junit` is explicitly excluded in `pom.xml`. https://github.com/apache/spark/blob/8e37824402531f82ad1dfef415b5c29b478df760/pom.xml#L1519 But the dependency tree for sbt build still includes `junit:junit`. ``` $ build/sbt Test/dependencyTree ... [info] | +-org.apache.parquet:parquet-column:1.17.0 [info] | | +-com.carrotsearch:junit-benchmarks:0.7.2 [info] | | +-junit:junit:4.13.2 [info] | | | +-org.hamcrest:hamcrest-core:1.3 [info] | | | [info] | | +-org.apache.commons:commons-lang3:3.17.0 (evicted by: 3.20.0) [info] | | +-org.apache.commons:commons-lang3:3.20.0 [info] | | +-org.apache.parquet:parquet-common:1.17.0 [info] | | | +-junit:junit:4.13.2 [info] | | | | +-org.hamcrest:hamcrest-core:1.3 ``` `junit:junit` also depends on `org.hamcrest:hamcrest-core:1.3`, which is causes an issue. If, we build spark using Maven with `-Psparkr`, the pom file for `hamcrest-core:1.3` will be downloaded but the corresponding jar file will not. ``` $ build/mvn -DskipTests -Psparkr package $ ls ~/.m2/repository/org/hamcrest/hamcrest-core/1.3/ _remote.repositories hamcrest-core-1.3.pom hamcrest-core-1.3.pom.lastUpdated hamcrest-core-1.3.pom.sha1 ``` If we then build Spark using sbt in this situation, it will fail. ``` $ build/sbt package ... $ build/sbt package ... [error] lmcoursier.internal.shaded.coursier.error.FetchError$DownloadingArtifacts: Error fetching artifacts: [error] file:/home/release/.m2/repository/org/hamcrest/hamcrest-core/1.3/hamcrest-core-1.3.jar: not found: /home/release/. m2/repository/org/hamcrest/hamcrest-core/1.3/hamcrest-core-1.3.jar ``` For the same reason, building Scala/Java API document will fail. ``` $ cd docs $ SKIP_RDOC=1 SKIP_PYTHONDOC=1 SKIP_SQLDOC=1 bundler exec jekyll build ... [error] lmcoursier.internal.shaded.coursier.error.FetchError$DownloadingArtifacts: Error fetching artifacts: [error] file:/home/release/.m2/repository/org/hamcrest/hamcrest-core/1.3/hamcrest-core-1.3.jar: not found: /home/release/. m2/repository/org/hamcrest/hamcrest-core/1.3/hamcrest-core-1.3.jar ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Confirmed both of the following commands succeeded ``` $ build/mvn -DskipTests -Psparkr package $ build/sbt package $ cd docs && SKIP_RDOC=1 SKIP_PYTHONDOC=1 SKIP_SQLDOC=1 bundler exec jekyll build ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#55645 from sarutak/exclude-old-junit. Authored-by: Kousuke Saruta <sarutak@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit 74ac7f4) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request? This PR fixes an issue that `sbt publishLocal` doesn't work. The reason is that `packageDoc/publishArtifact` doesn't work (so `sbt doc` also doesn't work). In the Spark community, API documents are generated using `unidoc` and I feel having API documents in `~/.m2` or `~/.ivy2` is not so useful. So, the solution is publishing API documents by `packageDoc / publishArtifact := false`. In addition, this PR allows developers to configure whether to publish source JARs by an environment variable `PUBLISH_PACKAGE_SRC`. The default value is `false`, which is the same behavior as `mvn install`. ### Why are the changes needed? Having artifacts in `~/.m2` or `~.ivy2` is useful for downstream developers ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Confirmed artifacts are in both `~/.m2` and `~/.ivy2` by the following command. ``` $ build/sbt publishLocal ``` Also confirmed `*-sources.jar` are in both `~/.m2` and `~/.ivy2` by the following command. ``` $ PUBLISH_PACKAGE_SRC=1 build/sbt publishLocal ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#55624 from sarutak/publishLocal-work. Authored-by: Kousuke Saruta <sarutak@amazon.co.jp> Signed-off-by: Kousuke Saruta <sarutak@apache.org> (cherry picked from commit 4b20e52) Signed-off-by: Kousuke Saruta <sarutak@apache.org>
…or StringType ### What changes were proposed in this pull request? Tighten the CDC `Changelog` connector contract so that `_commit_version` must be either `LongType` or `StringType`. Previously any `AtomicType` was accepted, which left several edge-case types (`IntegerType`, `TimestampType`, `BinaryType`, `Decimal`, `Float`, `Double`, `Boolean`, ...) silently allowed. - `ChangelogTable.validateSchema` now rejects everything outside `LongType` / `StringType` with a `BIGINT or STRING` expected-type message. - `Changelog` Javadoc updated to state the narrower contract and explain the ordering requirement (the netChanges post-processing path sorts rows by this column, so the column's natural ordering must match commit order). - `CdcNetChangesStatefulProcessor` ordering comment updated; the existing Catalyst-routed comparator is left in place for symmetry with the batch `SortOrder`. - `ChangelogResolutionSuite` updates: accept-list narrowed to `Long` / `String`; reject-list expanded to cover the previously-allowed atomic types (`Integer`, `Timestamp`) plus the existing complex-type cases. ### Why are the changes needed? `Long` (numeric monotonic version) and `String` (lexicographically ordered commit identifier) cover every realistic CDC source. The other atomic types are either strict subsets (`IntegerType` -> `LongType`) or duplicate the role of `_commit_timestamp` (`TimestampType`); types like `BinaryType` / `Float` / `Double` add NaN / boxing / ordering foot-guns with no expressive power gained. The narrower contract also lets the Javadoc state the ordering requirement precisely (matching what the netChanges code actually relies on). Locking down now is non-breaking (no external connectors yet) and keeps the documented surface area small. Relaxing later is non-breaking; restricting later is not. ### Does this PR introduce _any_ user-facing change? The `Changelog` connector API is `Evolving` and has no external implementations yet; the restriction only narrows what implementers may return. No user-facing behavior change. ### How was this patch tested? - `ChangelogResolutionSuite` (27 tests) covers the new accept / reject matrix. - `ResolveChangelogTablePostProcessingSuite`, `ResolveChangelogTableStreamingPostProcessingSuite`, `ResolveChangelogTableNetChangesSuite`, `ChangelogEndToEndSuite` -- 98 existing tests still pass on the new contract. - `UnsupportedOperationsSuite` (216 tests) still passes. - `Xdoclint:html,syntax,accessibility` is clean on `Changelog.java`; no new warnings under `Xdoclint:all`. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude opus-4-7 Closes apache#55663 from gengliangwang/SPARK-56711-restrict-commit-version-type. Authored-by: Gengliang Wang <gengliang@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit ae5c075) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…ntrols in Jobs and Stages page
### What changes were proposed in this pull request?
This PR fixes a UI regression in the Spark Web UI (Jobs and Stages pages) where spacing between the data table and the pagination controls ("Page", "Jump to", "Show items") was missing ([SPARK-55810](https://issues.apache.org/jira/browse/SPARK-55810)).
The issue was introduced during the Bootstrap upgrade in apache#54552, which caused the pagination controls to appear visually attached to the table. This change restores appropriate spacing between these elements to improve layout clarity and consistency.
### Why are the changes needed?
Without proper spacing, the table and pagination controls appear cramped and visually misaligned, which negatively impacts readability and overall user experience in the Spark Web UI.
This is a regression caused by the Bootstrap upgrade and should be fixed to maintain UI consistency.
### Does this PR introduce _any_ user-facing change?
Yes.
It improves the visual layout of the Jobs and Stages pages in the Spark Web UI by restoring proper spacing between the table and pagination controls.
### How was this patch tested?
1. Built Spark locally and ran spark-shell
2. Opened the Spark Web UI and navigated to the "Jobs" and "Stages" tabs
3. Verified that spacing between the table and pagination controls is correctly applied
4. Compared behavior before and after the fix to confirm the issue is resolved
### Was this patch authored or co-authored using generative AI tooling?
No.
### Screenshots
| Stages page before the fix | Stages page after the fix |
|---------------|--------------|
| <img width="1512" height="674" alt="Screenshot 2026-05-04 at 2 22 16 PM" src="https://github.com/user-attachments/assets/5d8824d3-0fb9-475f-a739-60bdc36db37d" /> | <img width="1509" height="720" alt="Screenshot 2026-05-01 at 9 28 42 PM" src="https://github.com/user-attachments/assets/e3c0928c-ba43-4a74-aeaf-4d4e93f60ef9" /> |
| Jobs page before the fix | Jobs page after the fix |
|--------------|------------|
| <img src="https://github.com/user-attachments/assets/d70cd94f-d52a-485e-b474-20c1777db3c6" width="100%"/> | <img src="https://github.com/user-attachments/assets/1c242b55-d62a-46f0-8bdb-64abb0ca785e" width="100%"/> |
| <img src="https://github.com/user-attachments/assets/5574d44d-722e-403f-ac73-3db69f495655" width="100%"/> | <img src="https://github.com/user-attachments/assets/20015117-1f5d-44ce-b4c1-07a45c640409" width="100%"/> |
Closes apache#55626 from XdithyX/master.
Authored-by: Adithya Ajith <4adhi007@gmail.com>
Signed-off-by: Kousuke Saruta <sarutak@apache.org>
(cherry picked from commit 7866872)
Signed-off-by: Kousuke Saruta <sarutak@apache.org>
…eanup followup ### What changes were proposed in this pull request? This is a follow-up to apache#55482 and contains four bug fixes and two small cleanups in `PlanMerger`: Bug fixes in `PlanMerger`: 1. Tagged `(Filter, Filter)` reuse preserves `mergedChild`'s appended columns: When the reuse check finds an existing `propagatedFilter` alias, the branch now rebuilds the Filter over `mergedChild` (via `cp.withNewChildren(Seq(mergedChild))`) instead of returning `cp` unchanged. If the recursion extended `cp.child`'s output with new columns (e.g. a computed `d = a + b` from a user Project below the Filter), returning `cp` would drop those columns while `npMapping` still pointed into them, leaving the enclosing `Aggregate` with unresolved references. 2. `(np: Filter, cp)` does not duplicate a `cpFilter` already present in `mergedChild`: `cpFilter`, when set, was produced by a deeper `(np, cp: Filter)` (or `(Join, Join)` pass-through) and is already part of `mergedChild`'s output. Appending it a second time via `++ cpFilter.toSeq` duplicated the attribute in the outer Project's projectList. 3. `(np, cp: Filter)` does not duplicate an `npFilter` already present in `mergedChild`: Symmetric to 2. on the `np` side. 4. `(np, cp: Filter)` with a `MERGED_FILTER_TAG`-tagged `cp` drops the tagged Filter: `cp`'s condition is `OR(pf_0, pf_1, ...)` and `cp`'s aggregate expressions already carry individual `FILTER (WHERE pf_i)` clauses. Synthesising a new `propagatedFilter_X = OR(pf_0, pf_1, ...)` would just add `FILTER AND(OR(...), pf_i)` wrapping upstream (simplifying to `FILTER pf_i`) plus a redundant alias in the Project. The branch now drops `cp`'s Filter and returns `cpFilter = None` so `cp`'s aggregates are left untouched. Cleanups in `PlanMerger.merge()`: - Unify the local variable name to `newMergedPlan` across all three branches (was `newMergedPlan` in one and `newMergePlan` in the other two) -- matches the `MergedPlan` case class name. - Replace `cache(i).merged` with `mp.merged`; `mp` and `cache(i)` are the same object inside the `collectFirst` pattern. ### Why are the changes needed? Fix 1. is a correctness bug. Fixes 2-4. are plan-shape bugs that produce duplicated attributes or redundant `OR`-of-propagated-filter aliases in the merged plan. The cleanups are minor readability improvements. ### Does this PR introduce _any_ user-facing change? No. All changes are internal to the optimizer; they produce cleaner merged plans for queries that `MergeSubplans` already handled. ### How was this patch tested? Four new tests in `MergeSubplansSuite`, one per fix: - `(np: Filter, cp)` does not duplicate a `cpFilter` already present in mergedChild -- exercises 2. via a `Join` with a `Filter` on the right child, routing a `cpFilter` up through `(Join, Join)` so that `mergedChild.output` already contains the attribute the branch used to re-append. - `(np, cp: Filter)` does not duplicate an `npFilter` already present in mergedChild -- exercises 3., mirror shape on the `np` side. - tagged `(Filter, Filter)` reuse must keep mergedChild's appended columns -- exercises 1. with three subqueries (sq1/sq2 create the tagged structure; sq3's Filter sits above a user Project introducing `d = a + b`, so the `(Filter, Filter)` tagged recursion extends `mergedChild` with `d`). - `(np, cp: Filter)` drops a tagged `cp` Filter without synthesising a redundant alias -- exercises 4. with three subqueries (sq1/sq2 create the tagged structure; sq3 has no filter). ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Opus 4.7 Closes apache#55659 from peter-toth/SPARK-56570-planmerger-code-cleanup-followup. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Peter Toth <peter.toth@gmail.com> (cherry picked from commit 3ae7da7) Signed-off-by: Peter Toth <peter.toth@gmail.com>
…Table ### What changes were proposed in this pull request? Document the predicate pushdown and column pruning contract for CDC `Changelog` connectors in the `Changelog` Javadoc. When any post-processing pass applies (carry-over removal, update detection, or netChanges): - Spark only pushes predicates that reference `_commit_version`, `_commit_timestamp`, or columns named by `rowId()` to the connector's `SupportsPushDownFilters` / `SupportsPushDownV2Filters`. - Predicates on `_change_type`, the `rowVersion()` column, or data columns are kept above the scan; pushing them would drop one half of a delete/insert pair within a row-identity group and silently break post-processing. When no post-processing pass applies, pushdown is unrestricted. `SupportsPushDownRequiredColumns` (column pruning) is unrestricted in either case — Spark's pruning already respects what the rewrite operators reference, so connectors should serve whatever required-column set Spark requests. The restriction is enforced by the rewrite shape itself: a `Window` / `Aggregate` / `TransformWithState` keyed on the safe columns sits between the relation and the user's filter, so Catalyst's predicate-pushdown rules naturally block unsafe pushes. Connectors do not need to code this restriction themselves but must not bypass it (e.g. by self-applying filters from connector-specific options). This is a sub-task of SPARK-55668. ### Why are the changes needed? The contract was implicit. A connector author reading the Javadoc could reasonably implement `SupportsPushDownFilters` and accept all predicates, including unsafe ones, expecting Spark to handle the rest. Spelling out which predicates the connector actually needs to handle (and why others are intentionally never delivered) prevents accidental misuse and explains the asymmetry to anyone debugging an unexpected post-scan filter. Documenting the column pruning side keeps the two related contracts in one place. ### Does this PR introduce _any_ user-facing change? Documentation only. No behavior change. ### How was this patch tested? `Xdoclint:html,syntax,accessibility` is clean on `Changelog.java`. No code changed; existing CDC test suites unaffected. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude opus-4-7 Closes apache#55664 from gengliangwang/SPARK-56712-restrict-cdc-pushdown. Authored-by: Gengliang Wang <gengliang@apache.org> Signed-off-by: Gengliang Wang <gengliang@apache.org> (cherry picked from commit ebfdf8b) Signed-off-by: Gengliang Wang <gengliang@apache.org>
…aitResult ### What changes were proposed in this pull request? `SparkException` at `ParquetFileFormat$.readParquetFootersInParallel` already has the structured error class `FAILED_READ_FILE.CANNOT_READ_FILE_FOOTER` with SQL state `KD001`, but `SparkThreadUtils.awaitResult` wraps it in a generic `SparkException`, hiding the SQL state. This PR: 1. **Adds `preserveSparkThrowable` parameter** to `SparkThreadUtils.awaitResult`, `ThreadUtils.awaitResult`, and `ThreadUtils.parmap` (default `false` — no behavior change for existing callers) 2. **Updates `ParquetFileFormat.readParquetFootersInParallel`** to pass `preserveSparkThrowable = true` to `parmap`, preserving the structured error class ### Files changed: - `SparkThreadUtils.scala` — new 3-param `awaitResult` overload - `ThreadUtils.scala` — new 3-param `awaitResult` overloads (Awaitable + JFuture) + `parmap` flag - `ParquetFileFormat.scala` — caller passes `preserveSparkThrowable = true` to `parmap` - `ThreadUtilsSuite.scala` — tests for `awaitResult` and `parmap` preservation - `ParquetFileFormatSuite.scala` — updated test to verify error class is thrown directly ### Why are the changes needed? When reading parquet footers in parallel, the `FAILED_READ_FILE.CANNOT_READ_FILE_FOOTER` error class is wrapped in a generic `SparkException` by `awaitResult`, losing the structured SQL state. This makes it harder for users and applications to programmatically handle specific error conditions. ### Does this PR introduce _any_ user-facing change? Yes. `ParquetFileFormat.readParquetFootersInParallel` now throws the structured `SparkException` directly instead of wrapping it in a generic `SparkException`. The error class and SQL state are preserved. ### How was this patch tested? 1. Added unit tests in `ThreadUtilsSuite` for `awaitResult` (Awaitable and JFuture) and `parmap` with `preserveSparkThrowable` flag true/false 2. Updated `ParquetFileFormatSuite` to verify the error class is thrown directly ### Was this patch authored or co-authored using generative AI tooling? Yes. Closes apache#54846 from linhongliu-db/preserve-spark-throwable. Authored-by: Linhong Liu <linhong.liu@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit faf17bd) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? Add a new expression `is_valid_variant`. Returns true if the variant is valid, false if it is malformed. ### Why are the changes needed? This is useful when the data source is not completely known. Other variant expressions will throw a `MALFORMED_VARIANT` exception if any row is malformed, which can be inconvenient to use when only a subset of rows contains malformed variant. This new expression will return a boolean instead of throwing exceptions. ### Does this PR introduce _any_ user-facing change? Yes. A new SQL expression is added. ### How was this patch tested? Unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#55599 from chenhao-db/is_valid_variant. Authored-by: chenhao-db <chenhao.li@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 109a130) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…as in ResolveReferences ## What changes were proposed in this pull request? Wrap outer star expansion results in Alias in `ResolveReferences.buildExpandedProjectLis`. When a targeted star (e.g. t1.*) inside a scalar subquery resolves from the outer scope, each expanded attribute is now wrapped in Alias(OuterReference(attr), name) instead of bare OuterReference(attr). For struct star expansion (e.g. t1.s.*), the expansion already produces Alias(GetStructField(...), fieldName). In that case we wrap only the inner child with OuterReference, preserving the existing Alias to avoid double aliasing. ## Why are the changes needed? Without the Alias, the OuterReference attribute's ExprId leaks into the subquery scope via Project.output (since OuterReference.toAttribute strips the wrapper). When a derived table wraps the outer star expansion, downstream operators (e.g. SELECT *) reference this leaked ExprId directly, which can cause issues with expression ID tracking. Wrapping in Alias gives each outer reference a fresh ExprId in the subquery's scope. Example: ``` SELECT (SELECT * FROM (SELECT t1.* FROM VALUES(2) AS t2(col1) LIMIT 1)) FROM VALUES(1) AS t1(col1) ``` ## Does this PR introduce any user-facing change? No. ## How was this patch tested? Added SQL golden file tests in scalar-subquery-select.sql: - Outer star expansion (t1.* from outer scope) - Outer struct star expansion (t1.s.*) - Untargeted star does NOT expand from outer scope - Inner scope wins when both match - Outer star through derived table wrapper Also fixed a pre-existing missing semicolon in the test file. ## Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (claude-opus-4-6) Closes apache#55606 from mihailotim-db/mihailotim-db/always_alias_outer_ref. Authored-by: Mihailo Timotic <mihailo.timotic@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 3b8dead) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…hould mirror getOrCreate path as much as possible ### What changes were proposed in this pull request? As titled this is a minor hygiene improvement which brings the create and getOrCreate codepaths closer together. Prior to this PR, the create codepath would always create a new SparkConf and pass that to SparkContext.getOrCreate. When a SparkSession/Context already exists, the creation of the SparkConf is not required at all and the SparkContext can be fetched from the instantiated session. Note that this change only modifies SparkSessionBuilder.create in PySpark classic which was recently added [here](apache#53820). ### Why are the changes needed? Code hygiene ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? We mainly rely on existing tests for the create codepath. We also add a test to verify that a SparkConf is not created when there is an existing session. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#54429 from jonmio/jonmio-unify-create-and-getOrCreate. Authored-by: Jon Mio <16511957+jonmio@users.noreply.github.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit f5ca55c) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…en sql-related modules are modified ### What changes were proposed in this pull request? This PR marks `pipelines` as depending on the `sql` module in Spark's test module graph, and updates the existing doctest expectations for the affected module closure. ### Why are the changes needed? The `pipelines` project depends on Spark SQL, but the test module graph treated it as independent. As a result, changes in SQL-related modules such as `sql`, `catalyst`, and `sql-api` did not select `pipelines/test` in the affected-module CI path. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? ```bash python3 -m py_compile dev/sparktestsupport/modules.py dev/sparktestsupport/utils.py PYTHONPATH=dev python3 -m doctest dev/sparktestsupport/utils.py git diff --check ``` Also verified locally that changes in `core`, `sql-api`, `catalyst`, `sql`, and `sql/pipelines` include the `pipelines` module in the affected-module selection. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: OpenAI Codex Closes apache#56405 from LuciferYang/fix-pipelines-module-deps. Authored-by: YangJie <yangjie01@baidu.com> Signed-off-by: yangjie01 <yangjie01@baidu.com> (cherry picked from commit 6d4b71e) Signed-off-by: yangjie01 <yangjie01@baidu.com>
…nnect session is closing ### What changes were proposed in this pull request? This PR fixes a race condition between registering a newly started streaming query and the owning Connect session being closed. The race strands two kinds of per-session resources, so the fix touches two server-side caches with the same publish-then-recheck pattern. #### 1. `SparkConnectStreamingQueryCache` (the query cache) When a Connect session is closed (`SessionHolder.close()`), it stops all of the session's streaming queries via `SparkConnectStreamingQueryCache.cleanupRunningQueries()`, which iterates over the query cache. As the existing code comment in `close()` notes, *"there can be concurrent streaming queries being started."* A query that finishes `DataStreamWriter.start()` on another thread and is registered **after** that iteration is missed by the cleanup. It is left in the cache as an active entry holding a strong reference to the now-closed session, and is never stopped — so the driver cannot exit. The fix makes `registerNewStreamingQuery` coordinate with session shutdown without introducing additional locking: 1. **`SessionHolder.isClosing`** — a new `private[connect]` accessor exposing `closedTimeMs.isDefined`. `closedTimeMs` is set at the very start of `close()`, before any session resources (including streaming queries) are cleaned up, so it is a reliable "session is shutting down" signal. 2. **Publish-then-recheck in `registerNewStreamingQuery`** — after the query is inserted into the cache, we re-check `sessionHolder.isClosing`. If the session is closing, we stop the query (asynchronously, so we don't block the caller) and drop the entry. Because `closedTimeMs` is set before `cleanupRunningQueries()` runs, and both the cache publish and `closedTimeMs` are volatile, every interleaving is covered: - if we observe the session as closing, we stop and drop the query here; - otherwise `close()` has not set `closedTimeMs` yet, so its `cleanupRunningQueries()` runs after our cache insertion and observes the entry we just published. `StreamingQuery.stop()` is idempotent and `isActive`-guarded, so both sides firing is harmless. 3. **Identity-based, stop-then-remove async cleanup** — when the recheck fires, we stop the query and remove the cache entry on a `Future` (since `stop()` may block). Two subtleties: - **Stop before remove.** We drop the entry only *after* the query has actually stopped. Removing it first would discard the only server-side handle to a query that might still be running, re-introducing the very leak this guards against. If `stop()` throws, we leave the entry cached so `periodicMaintenance` (and `cleanupRunningQueries`) can still find and reap it once the query becomes inactive. - **Match by query identity, not value equality.** Removal uses `queryCache.computeIfPresent` and only nulls the entry when `current.query eq query`. A plain `queryCache.remove(queryKey, value)` by `QueryCacheValue` case-class equality would *fail to remove* the entry if the maintenance thread had concurrently rewritten its `expiresAtMs` after observing the just-stopped query (the value no longer equals the one we inserted). Identity matching removes our entry regardless of such rewrites, while still never evicting a later replacement registered for the same key. 4. **`isActive` check at insertion time** — the new entry's `expiresAtMs` is now derived from `query.isActive`. A query that is already inactive at registration time (e.g. a `Trigger.AvailableNow` query that already finished, or one stopped right after `start()`) gets an expiry time immediately, instead of lingering as a falsely "active" entry until a later maintenance cycle notices it stopped. #### 2. `StreamingForeachBatchHelper.CleanerCache` (the foreachBatch runner cache) The same shutdown window applies to the Python `foreachBatch` runner cleaner, which is registered (via `registerCleanerForQuery`) immediately after the query is registered. `close()` reaps runners through `CleanerCache.cleanUpAll()`, and the `onQueryTerminated` listener is the other reaper. A cleaner registered for a query started concurrently with `close()` can be missed by `cleanUpAll()`, and if its query already terminated it can also be missed by the listener — stranding a Python worker, the same class of leak as the query case. This PR applies a symmetric guard: 1. **Pre-insert fast path** — if `sessionHolder.isClosing` is already true on entry, we close the runner immediately and return without registering anything (in particular without adding the listener; see below). 2. **Post-insert recheck** — after inserting the cleaner, we re-check `isClosing` and, if the session started closing in the meantime, clean the runner up here and drop the listener. 3. **Listener-lifecycle rework** — the runner clean-up listener was a `lazy val` initialized on first query and never removed. Crucially, `SessionHolder.close()` does **not** remove this listener (it is not tracked in the session's `listenerCache`), so a listener left registered keeps the `CleanerCache` / `SessionHolder` reachable after the session is closed. It is now a `var` managed by `ensureListenerRegistered()` / `removeListenerIfRegistered()`, both guarded by `this`: - `cleanUpAll()` removes the listener after reaping the runners; - the post-insert recheck removes it too (the recheck may have just re-added it after `cleanUpAll()` ran); - the field is **recoverable** — a later registration re-adds a fresh listener, so `cleanUpAll()` does not permanently disable the cache if it is ever reused. Today `cleanUpAll()` is only called on the close path (after which registration fast-paths on `isClosing`), but correctness no longer depends on that. - `listenerForTesting` now reads the field under the same lock, so concurrent tests see a consistent value rather than a stale/torn read. ### Why are the changes needed? The race strands streaming queries: they keep running, hold a reference to a closed session, and are never reaped. In production this manifested as Connect structured-streaming jobs in multi-task workflows that complete their computation but never exit — the driver sits idle (0% CPU) for many hours until a manual cancellation or a much longer infra-level cleanup timeout, incurring significant unnecessary cost. The foreachBatch-cleaner guard closes the symmetric leak for Python `foreachBatch` workers. Fixing the registration/shutdown race lets the session-close path reliably account for every query and runner so the driver can terminate normally. ### Does this PR introduce _any_ user-facing change? No. This fixes an internal resource-cleanup race in the Spark Connect server. There is no API or behavior change visible to users other than streaming queries (and their Python `foreachBatch` workers) no longer being stranded when a session is closed concurrently with a query starting. ### How was this patch tested? Added 7 unit tests across two suites. `SparkConnectStreamingQueryCacheSuite`: - `"Query registered when the session is already closing is stopped and dropped"` - `"Query registered for a closing session is retained when stopping it fails"` - `"Query registration racing with session shutdown leaves no query running"` (200-iteration race test) `StreamingForeachBatchHelperSuite`: - `"CleanerCache: a runner registered for a closing session is cleaned up immediately"` - `"CleanerCache.cleanUpAll unregisters the streaming listener"` - `"CleanerCache: listener is recoverable -- re-registered after cleanUpAll"` - `"CleanerCache: registration racing with session shutdown strands no runner or listener"` (200-iteration race test) The existing happy-path tests continue to pass with the `isActive`-based expiry change (an active query is still cached with no expiry). ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (Claude Opus 4.8) Closes apache#56377 from dbtsai/spark-connect-fix. Lead-authored-by: DB Tsai <dbtsai@dbtsai.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: DB Tsai <dbtsai@dbtsai.com> (cherry picked from commit a852aa3) Signed-off-by: DB Tsai <dbtsai@dbtsai.com>
### What changes were proposed in this pull request?
For `getattr(chained_func, "__module__", "").startswith("pyspark.sql.worker.")`, if `changed_func.__module__` is `None`, it will raise an exception. So we check the attribute.
### Why are the changes needed?
In some rare cases, `chained_func.__module__` can be `None` - for example, if the function is created with `compile` or `exec`. We need to deal with that.
### Does this PR introduce _any_ user-facing change?
Yes. This fixed a bug when users use profilers against UDFs without `__module__`.
### How was this patch tested?
A regression test is added.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes apache#56416 from gaogaotiantian/fix-datasource-profiler.
Authored-by: Tian Gao <gaogaotiantian@hotmail.com>
Signed-off-by: Tian Gao <gaogaotiantian@hotmail.com>
(cherry picked from commit 9eb44b3)
Signed-off-by: Tian Gao <gaogaotiantian@hotmail.com>
…ite by returning live table from SharedTablesInMemoryRowLevelOperationTableCatalog ### What changes were proposed in this pull request? Override loadTable in SharedTablesInMemoryRowLevelOperationTableCatalog to return the live table instance instead of a snapshot copy. SPARK-56995 (apache#56121) introduced a loadTable override in InMemoryRowLevelOperationTableCatalog that returns a deep copy of the table on every call. This is needed for DSv2 Transaction API cache validation semantics, but it breaks TRUNCATE TABLE and DROP TABLE IF EXISTS for tests using the shared catalog variant — the mutation is applied to a disposable copy, leaving the live table's data intact. ### Why are the changes needed? Two tests in `AutoCdcScd1FullRefreshSuite` have been failing since SPARK-56995 was merged: - "full refresh wipes target rows and the auxiliary table for the refreshed flow" - "selective full refresh wipes only the requested target's auxiliary state" https://github.com/apache/spark/actions/runs/27166004427/job/80196384953 The failures were not caught in SPARK-56995's CI because the pipelines module does not declare a dependency on sql or catalyst, so its tests were not triggered by changes under sql/catalyst/. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Confirmed all 3 tests in `AutoCdcScd1FullRefreshSuite` and all 44 tests in `MergeIntoDataFrameSuite` passed. ### Was this patch authored or co-authored using generative AI tooling? Kiro CLI / Claude Closes apache#56378 from sarutak/fix-autocdc-full-refresh-suite. Authored-by: Kousuke Saruta <sarutak@amazon.co.jp> Signed-off-by: Kousuke Saruta <sarutak@apache.org> (cherry picked from commit 44984a6) Signed-off-by: Kousuke Saruta <sarutak@apache.org>
…hdown via a dialect string-literal escaping hook ### What changes were proposed in this pull request? Building a pushed-down `LIKE` predicate for `STARTS_WITH` / `ENDS_WITH` / `CONTAINS` involves two distinct escaping layers: 1. **LIKE meta-char escaping** -- escape `_`, `%`, and the escape char `\` itself, using `\` as the LIKE escape character. This is what `V2ExpressionSQLBuilder.escapeSpecialCharsForLikePattern` does, and it is dialect-independent. 2. **SQL string-literal escaping** -- make the resulting pattern text survive the target database's string-literal parser. This is dialect-specific: standard SQL only doubles `'`, but MySQL also processes `\` inside string literals. The base builder only handled layer 1 and assumed a trivial layer 2 (hand-wrapping the value in `'...'`). MySQL is the one dialect whose layer 2 differs, so it had to override all three `visitStartsWith`/`visitEndsWith`/`visitContains` methods purely to emit `ESCAPE '\\'`. This PR separates the two layers: - Adds a protected, overridable hook `escapeStringLiteralForLikePattern(String)` to `V2ExpressionSQLBuilder`. It defaults to the identity function, so the generated SQL for every standard-SQL dialect is **byte-for-byte unchanged**. - The shared `visitStartsWith`/`visitEndsWith`/`visitContains` now route both the LIKE pattern and the `\` escape character through this hook (via a small `likeWithEscape` helper), so the LIKE escape character is defined in one place. - `MySQLDialect` now overrides **only** `escapeStringLiteralForLikePattern` (doubling backslashes for MySQL's string-literal layer) and **deletes** its three duplicated visit-method overrides. This is a follow-up to the design issue on apache#56350 (SPARK-57287). ### Why are the changes needed? SPARK-57287 fixed backslash escaping in `escapeSpecialCharsForLikePattern` (layer 1), which is correct for standard-SQL dialects. But MySQL reuses that base method and adds an extra string-literal unescaping layer: it treats `\` as an escape character inside string literals (this is exactly why `MySQLDialect` already wrote `ESCAPE '\\'` rather than `ESCAPE '\'`, per SPARK-48172). MySQL's string-literal parser collapses the single backslash doubling back to one `\`, so the pushed-down pattern for a value such as `startsWith("abc\")` resolved to `abc\%` -> `abc` followed by a literal `%`, returning silently wrong results. Concretely, before this PR, against MySQL: ``` spark.table("mysql_catalog.db.t").filter($"c".startsWith("abc\\")) // pushed: c LIKE 'abc\\%' ESCAPE '\\' -> after MySQL string-literal parsing: c LIKE 'abc\%' ESCAPE '\' // matches "abc" + literal "%", NOT values starting with "abc\" ``` The current coarse override surface (reimplement the whole visit method) is also what let SPARK-48172 fix the `ESCAPE` clause but miss the pattern body. Decoupling the two layers fixes the MySQL backslash case and removes the duplicated visit methods, so a dialect only needs to override the visit methods when its `LIKE` matching semantics genuinely differ -- never just for escaping. ### Does this PR introduce _any_ user-facing change? Yes. For the MySQL JDBC dialect, `startsWith` / `endsWith` / `contains` predicates on values containing a backslash now push down a correct `LIKE` pattern and return correct results instead of silently wrong results. There is no change for any other dialect (the new hook is identity by default, and the generated SQL is unchanged). ### How was this patch tested? Added a unit test in `JDBCSuite` that compiles `STARTS_WITH` / `ENDS_WITH` / `CONTAINS` predicates with both the default dialect and `MySQLDialect`, asserting the generated SQL for backslash values (and that wildcard escaping is preserved). The default-dialect output is unchanged; the MySQL output now doubles backslashes through the string-literal layer. The existing H2-based coverage in `JDBCV2Suite` (added by SPARK-57287) continues to exercise the standard-SQL path end-to-end. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (Opus 4.8) Closes apache#56384 from cloud-fan/decouple-like-pattern-escaping. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit f5eabcb) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ge codegen is disabled ### What changes were proposed in this pull request? Update the metric `number of output rows` for non-WSCG code path ### Why are the changes needed? `number of output rows` metric is not updated/incremented when WSCG is off - it can be useful to show the numOutputRows and be consistent with the WSCG path. <img width="370" height="339" alt="image" src="https://github.com/user-attachments/assets/3531d904-16c6-456c-a9a0-6b667b2449f8" /> ### Does this PR introduce _any_ user-facing change? Yes. `number of output rows` metric gets correct value now when WSCG is off ### How was this patch tested? New test case added. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#56363 from jiwen624/SPARK-57313. Authored-by: Eric Yang <jiwen624@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit f2d11a6) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Brings in 260 upstream commits from apache/spark v4.2.0-preview5..v4.2.0-rc1. Conflict resolutions: - pom.xml (45), docs/_config.yml, python/pyspark/version.py: kept the downstream version string 4.2.0.1-4.3.0-0 (ours), preserving rc1's other auto-merged content. - .github/workflows/*.yml (10): kept deleted (downstream uses a single ci.yml; these Apache workflows were removed in NGSOK-1622). - DropTableExec.scala: combined downstream purge-on-external (isPurgeableExternalTable) with rc1's ViewCatalog-aware not-exists branch. - ddl.scala (DropTableCommand): kept downstream's hoisted `val table` (reused for isPurgeableExternalTable) and adopted rc1's CatalogTable.isViewLike(t) match to also cover METRIC_VIEW.
… in JDBCSuite and V2ExpressionSQLBuilder ### What changes were proposed in this pull request? This PR fixes a style issue introduced by apache#56384. There are some lines in `JDBCSuite.scala` and `V2ExpressionSQLBuilder.java` whose length exceed 100 characters. ### Why are the changes needed? To recover CI. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? GA. ### Was this patch authored or co-authored using generative AI tooling? Kiro CLI / Claude Closes apache#56441 from sarutak/decouple-like-pattern-escaping. Lead-authored-by: Kousuke Saruta <sarutak@amazon.co.jp> Co-authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Kousuke Saruta <sarutak@apache.org> (cherry picked from commit 60acc8f) Signed-off-by: Kousuke Saruta <sarutak@apache.org>
214c045 to
fbf9f21
Compare
…vel when writing Arrow batches ### What changes were proposed in this pull request? This PR fixes a bug where the zstd compression level configured via `spark.sql.execution.arrow.compression.zstd.level` was silently ignored everywhere Arrow batches are compressed. Three places shared the same broken pattern: - `ArrowConverters.ArrowBatchIterator` (SPARK-54134) - `PythonArrowInput` (SPARK-54226; also covers `GroupedPythonArrowInput`, which reuses this codec via SPARK-55328) - `CoGroupedArrowPythonRunner` (SPARK-54226) They constructed `new ZstdCompressionCodec(level)` only to read its codec type, then rebuilt the codec through `CompressionCodec.Factory.INSTANCE.createCodec(codecType)`. The codec type enum does not carry a level, so that single-argument factory overload always builds a codec at the zstd default level (3), dropping the configured one. The codec construction is extracted into a shared `ArrowCompressionUtils.createCompressionCodec` helper that constructs the level-carrying codec instance directly (the helper lives in `sql/core` because `sql/api`, where `ArrowUtils` is, has no `arrow-compression` dependency). The level only matters on the write side; the read side looks up the codec by the type recorded in the IPC message, so reads are unaffected and the on-wire format is unchanged. The same bug class was found by dbtsai during review of apache#56334 (apache#56334 (comment)); that PR fixes the cache-side instance of the pattern, and this PR fixes the remaining three pre-existing instances. ### Why are the changes needed? Users tuning `spark.sql.execution.arrow.compression.zstd.level` for Python UDF exchange or `df.toArrow()` got no effect at all: every level compressed identically at the default level 3, with no error or warning. ### Does this PR introduce _any_ user-facing change? Yes. The configured zstd level now actually takes effect; previously all levels behaved like the default level 3. The bug exists in released Spark 4.1.0/4.1.1/4.1.2 (SPARK-54134 and SPARK-54226 were backported to branch-4.1) as well as 4.2.0 RCs and master, so this fix is a candidate for backporting to branch-4.1 and branch-4.2. Note that a branch-4.1 backport needs to fix a fourth copy of the pattern: there `GroupedPythonArrowInput` still has its own codec construction, since the SPARK-55328 deduplication is master-only. ### How was this patch tested? New `ArrowCompressionUtilsSuite`. The regression test compresses the same compressible-but-varying batch at zstd level -5 and level 19 and asserts level 19 produces a strictly smaller payload. Against the old codec construction this test fails with byte-identical sizes at both levels (verified locally). A second test covers the `none` codec and the unsupported-codec error. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code Closes apache#56444 from viirya/fix-arrow-zstd-level. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com> (cherry picked from commit e33017a) Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
…ty_listener ### What changes were proposed in this pull request? Catch `AnalysisException` for the test and ignore it because it's acceptable (table not created yet). ### Why are the changes needed? The test has been flaky on the Build / Python-only (master, Python 3.12, MacOS26) scheduled workflow: 2026-05-23 — https://github.com/apache/spark/actions/runs/26346300968/job/77556662680 2026-05-25 — https://github.com/apache/spark/actions/runs/26423905857/job/77783724134 Both failed with AnalysisException: TABLE_OR_VIEW_NOT_FOUND on listener_terminated_events: the onQueryTerminated callback fires asynchronously after q.stop() returns and writes the table via saveAsTable, but on slower macOS runners the read races the write. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Local test passed. This fails on MacOS more frequently so we need to observe it in scheduled tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#56309 from gaogaotiantian/test-parity-listener. Authored-by: Tian Gao <gaogaotiantian@hotmail.com> Signed-off-by: Tian Gao <gaogaotiantian@hotmail.com> (cherry picked from commit 3fce4cf) Signed-off-by: Tian Gao <gaogaotiantian@hotmail.com>
…ing LICENSE and NOTICE files ### What changes were proposed in this pull request? Make the PySpark and SparkR source distributions include top-level LICENSE and NOTICE files: - Add include LICENSE / include NOTICE to python/MANIFEST.in. - In dev/make-distribution.sh, copy LICENSE/NOTICE into python/ and R/pkg/ before building the sdists (with an EXIT trap to clean them up). - The classic pyspark sdist bundles the assembly jars, so it ships the binary license variants (LICENSE-binary/NOTICE-binary plus the full licenses-binary set), mirroring the binary distribution. The pyspark_connect, pyspark_client, and SparkR artifacts bundle no jars, so they ship the plain source LICENSE/NOTICE. packaging/classic/setup.py falls back to licenses/ when licenses-binary/ is absent (RELEASE-mode builds). - During the SparkR build, + file LICENSE is added to DESCRIPTION temporarily (restored after the build) so R CMD check --as-cran does not warn that the bundled LICENSE is not mentioned. The committed DESCRIPTION is unchanged, so SparkR CI is unaffected. The "Non-standard file/directory found at top level: 'NOTICE'" NOTE cannot be silenced this way and is expected in release-build logs. - Add tar-listing regression guards after the sdist and SparkR builds that fail the release build if LICENSE or NOTICE ever goes missing again, instead of relying on an RC vote to catch it. ### Why are the changes needed? ASF release policy requires every distributed artifact to include LICENSE and NOTICE. The pyspark, pyspark_connect, pyspark_client, and SparkR source tarballs currently don't, which was raised as a -1 during the Spark 4.2.0 RC1 vote. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Built the source distributions locally and confirmed each contains LICENSE and NOTICE at the package root ### Was this patch authored or co-authored using generative AI tooling? Yes, drafted with assistance from Cursor. Closes apache#56453 from huaxingao/fix_license. Authored-by: Huaxin Gao <huaxin.gao11@gmail.com> Signed-off-by: huaxin-gao_snow <huaxin.gao@snowflake.com> (cherry picked from commit b88952c) Signed-off-by: huaxin-gao_snow <huaxin.gao@snowflake.com>
…lved SHA in maven_test.yml and python_hosted_runner_test.yml
### What changes were proposed in this pull request?
In `.github/workflows/maven_test.yml` and `.github/workflows/python_hosted_runner_test.yml`, add a step to the precompile job (`precompile-maven` / `precompile`) that captures `git rev-parse HEAD` right after the apache/spark checkout, expose it as a `head_sha` job output, and switch the downstream `build` job's `actions/checkout` from `ref: ${{ inputs.branch }}` to `ref: ${{ needs.<precompile-job>.outputs.head_sha || inputs.branch }}`.
This is the same pinning that SPARK-56866 applied to `build_and_test.yml`, with one adaptation. Unlike the mandatory `precondition` job there (downstream jobs are skipped when it fails), the precompile jobs here are best-effort: they run with `continue-on-error: true` and the `build` job proceeds with `if: (!cancelled())` even when precompile fails. The `|| inputs.branch` fallback keeps that degraded path intact: if the precompile job dies before resolving the SHA, the `build` job resolves `inputs.branch` itself, exactly as today. Without the fallback, an empty `ref:` would make `actions/checkout` fall back to the default branch or the (master) event SHA, which is wrong for `branch-4.x` runs.
### Why are the changes needed?
These two reusable workflows have the same cross-job checkout race that SPARK-56866 fixed in `build_and_test.yml`: the precompile job builds Spark and uploads the compiled output as an artifact, then each `build` matrix entry independently re-resolves `ref: ${{ inputs.branch }}` at the moment its runner picks it up. The matrix only starts after the full precompile build finishes (tens of minutes), so the drift window is structurally long. If the branch advances in between, a `build` entry checks out a newer commit than what was precompiled and runs tests against stale compile artifacts extracted on top of it, producing the same class of spurious mixed-commit failures described in SPARK-56866.
Workflows covered by this change:
- `maven_test.yml` is called by 11 workflows: `build_maven.yml`, `build_maven_java21.yml`, `build_maven_java21_arm.yml`, `build_maven_java21_macos26.yml`, `build_maven_java25.yml`, and `build_branch4{0,1,2}_maven*.yml`.
- `python_hosted_runner_test.yml` is called by `build_python_3.12_arm.yml` and `build_python_3.12_macos26.yml`.
### Does this PR introduce _any_ user-facing change?
No. CI infrastructure only.
### How was this patch tested?
YAML syntax validated locally. These workflows are not exercised by PR CI (their callers are schedule-triggered daily jobs), so the change takes effect on their next scheduled runs; it mirrors the `build_and_test.yml` change that has been running since 2026-05-19.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-fable-5)
Closes apache#56450 from zhengruifeng/ci-pin-checkout-sha-other-workflows-dev4.
Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@foxmail.com>
(cherry picked from commit 0e1101a)
Signed-off-by: Ruifeng Zheng <ruifengz@foxmail.com>
… type ### What changes were proposed in this pull request? Check whether the imported class is a real UserDefinedType before calling it. ### Why are the changes needed? If it's not a UDT, it could run wrong code. ### Does this PR introduce _any_ user-facing change? Not for normal use. ### How was this patch tested? Regression test added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#56460 from gaogaotiantian/add-import-type-check. Authored-by: Tian Gao <gaogaotiantian@hotmail.com> Signed-off-by: DB Tsai <dbtsai@dbtsai.com> (cherry picked from commit 1f53674) Signed-off-by: DB Tsai <dbtsai@dbtsai.com>
fbf9f21 to
87a7e5a
Compare
Signed-off-by: Petr Fedchenkov <giggsoff@gmail.com>
Bump all module versions, docs config, PySpark and R package versions from 4.2.0.1-4.3.0-0 to 4.2.0.1-4.3.0-1. Signed-off-by: Petr Fedchenkov <giggsoff@gmail.com>
87a7e5a to
415d891
Compare
The "bounded memory usage calculation" test asserted that the global
RocksDBMemoryManager singleton reports 0 unbounded instances. That count
is shared across all tests in the JVM: a preceding unbounded test can leave
a straggler updateMemoryUsage(..., isBoundedMemory=false) call that lands in
instanceMemoryMap after this test's reset and is never unregistered (the store
has already closed), permanently polluting the map. eventually{} cannot drain
it, producing the SPARK-55993 flake "1 did not equal 0".
Drop the brittle global unbounded-count assertion and keep the stable signal:
this query registers exactly 2 bounded instances and reports memory usage.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Sync changes from v4.2.0-rc3 and update version to 4.2.0.1-4.3.0-1