[FEAT] Add SparkSession tag management for Interrupt()#183
Open
40u5 wants to merge 1 commit into
Open
Conversation
Surfaces SparkSession.AddTag / RemoveTag / GetTags / ClearTags so user code can tag operations and later cancel them by tag via InterruptTag. Tags are stored on the underlying SparkConnectClient (mutex-protected) and threaded into ExecutePlanRequest.Tags on every execute, matching PySpark's tag semantics. Validation matches the Spark Connect contract: tags must be non-empty and must not contain ','. Together with the Interrupt() / InterruptTag / InterruptOperation surface added in apache#182, this closes apache#49.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Closes #49.
Adds the per-session tag management API that complements the
Interrupt/InterruptTag/InterruptOperationsurface introduced in #182, so that callers can actually tag operations and then cancel by tag:SparkSession.AddTag(tag string) error— attaches a tag to every subsequent operation issued by the session.SparkSession.RemoveTag(tag string) error— removes a previously-added tag. Removing a tag that was never added is a no-op.SparkSession.GetTags() []string— returns the tags currently attached to the session, sorted lexicographically for deterministic output.SparkSession.ClearTags()— drops every tag attached to the session.Plumbing:
sparkConnectClientImplbehind async.RWMutexso it is safe to mutate from multiple goroutines.newExecutePlanRequestnow setsrequest.Tags = s.GetTags(), so the tags ride along on everyExecutePlan/ExecuteCommandissued afterAddTag.SparkConnectClientinterface (spark/client/base/base.go) andspark/mocks/mock_executor.gois updated to satisfy the extended interface.base.ValidateTagso both the real client and the in-tree mock enforce the Spark Connect contract: tags must be non-empty and must not contain,. Invalid input returnssparkerrors.InvalidArgumentError.Why are the changes needed?
Issue #49 asks for
Interrupt()support on the Go client. #182 surfacesInterruptAll/InterruptTag/InterruptOperation, butInterruptTagis only useful if callers can actually attach tags to operations — which today they cannot, because there is no equivalent of PySpark'sSparkSession.addTag/removeTag/getTags/clearTags. This PR fills that gap.Does this PR introduce any user-facing change?
Yes — additive only:
SparkSessiongainsAddTag,RemoveTag,GetTags,ClearTags.SparkConnectClientinterface gains the same four methods; no known external implementers, and the in-treemocks.TestExecutoris updated in the same commit.base.ValidateTagis exported so callers can reuse the same validation if they want.No backward-incompatible breakages.
How was this patch tested?
Added unit tests in
spark/client/client_test.go:TestAddTagRejectsInvalidInput— empty tag and tag containing,both returnsparkerrors.InvalidArgumentErrorand are not stored.TestTagsRoundTripAddRemoveClear—AddTagdedupes,GetTagsreturns sorted output,RemoveTagworks including removing tags that were never added (no-op),ClearTagsresets the set.TestExecutePlanRequestCarriesSessionTags— wraps the existing mock in a recorder that captures theExecutePlanRequestand verifies that tags configured viaAddTagend up onrequest.Tags, and thatClearTagsscrubs them from subsequent requests.And in
spark/sql/sparksession_test.go:TestSparkSessionTagsRoundTripThroughClient— exercises the same flow through the publicSparkSessionAPI, including invalid-tag rejection.Locally:
go build ./...— clean.go vet ./...— clean.gofmt -l ./spark— no diff.go test $(go list ./... | grep -v internal/tests/integration)— all PASS. The integration suite is excluded because it requiresSPARK_HOMEand is unrelated to this change.