Skip to content

[Data] OperatorFusionRule: capability-based fusion check#64244

Open
ShockYoungCHN wants to merge 1 commit into
ray-project:masterfrom
ShockYoungCHN:capbased-fusion
Open

[Data] OperatorFusionRule: capability-based fusion check#64244
ShockYoungCHN wants to merge 1 commit into
ray-project:masterfrom
ShockYoungCHN:capbased-fusion

Conversation

@ShockYoungCHN

Copy link
Copy Markdown
Contributor

Refactor "down absorbs up" fusion to capability-based dispatch

Move fusion construction from the rule into the operator itself. The rule no longer do isinstance-check specific classes on the absorber side, it asks the op via two new PhysicalOperator hooks:

  • absorbs_upstream_map_transformer() → bool
  • fuse_with_upstream_map_transformer(transformer) → PhysicalOperator

AllToAllOperator implements both, carrying the same transform-fn wrapping logic that was previously in _get_fused_all_to_all_operator.

FuseOperators changes:

  • rename _fuse_all_to_all_operators_in_dag to _fuse_absorber_operators_in_dag
  • rename _get_fused_all_to_all_operator to _get_fused_absorber_operator (delegates to down_op.fuse_with_upstream_map_transformer)
  • _can_fuse adds a new case TaskPoolMapOperator → any absorber and removes TaskPoolMapOperator → AllToAllOperator

No behavior change, AllToAllOperator is the only absorber. It opens the seam for future sink ops to opt into fusion without modifying the rule.

@ShockYoungCHN ShockYoungCHN requested a review from a team as a code owner June 21, 2026 09:10

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request generalizes operator fusion in Ray Data by introducing a mechanism for physical operators to declare if they can absorb an upstream TaskPoolMapOperator's MapTransformer (via absorbs_upstream_map_transformer) and how to perform the fusion (via fuse_with_upstream_map_transformer). The AllToAllOperator is updated to implement this capability, and the operator fusion rule is refactored to support any such 'absorber' operator rather than being hardcoded to AllToAllOperator. Feedback suggests making the logical operator rebuilding in _build_fused_logical_op more defensive by using .get() and getattr to avoid potential KeyError or AttributeError issues.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread python/ray/data/_internal/logical/rules/operator_fusion.py Outdated
@ray-gardener ray-gardener Bot added data Ray Data-related issues community-contribution Contributed by the community labels Jun 21, 2026
@ShockYoungCHN ShockYoungCHN removed the community-contribution Contributed by the community label Jun 22, 2026
…ber side

Refactors the "down absorbs up" fusion path so the rule no longer
isinstance-checks specific operator classes on the absorber side. The
absorber owns the construction of the fused replacement; the rule owns
DAG-level bookkeeping (logical-op replacement in _op_map).

Before:
  - _fuse_all_to_all_operators_in_dag isinstance-checked AllToAllOperator
  - _get_fused_all_to_all_operator built the fused AllToAllOperator with
    the upstream MapTransformer stashed on TaskContext, plus rebuilt the
    corresponding RandomShuffle/Repartition logical op.
  - Any new sink op (future shuffle backends, write sinks, etc.) wanting
    the same fusion would have required modifying the rule.

After:
  - PhysicalOperator gains a pair of capability methods:
    - absorbs_upstream_map_transformer() -> bool  (default False)
    - fuse_with_upstream_map_transformer(transformer) -> PhysicalOperator
      (default raises NotImplementedError)
  - AllToAllOperator overrides both. Its fuse_with_upstream_map_transformer
    carries the prior _get_fused_all_to_all_operator transform-fn logic
    verbatim: the new bulk_fn wraps the original transform_fn and stashes
    the upstream MapTransformer (plus its ray_remote_args) on TaskContext,
    so the existing shuffle scheduler picks them up and wraps each map task.
    Behavior-preserving.
  - OperatorFusionRule:
    - _is_absorber(op) reduces to op.absorbs_upstream_map_transformer().
    - _get_fused_absorber_operator(down, up) reduces to
      down.fuse_with_upstream_map_transformer(up.get_map_transformer())
      plus shared bookkeeping (_build_fused_logical_op handles the
      RandomShuffle/Repartition logical-op rebuild).
    - _can_fuse adds a third branch for any capability-implementing
      downstream; it skips the rule's MapOperator-oriented logical-op
      compat checks (absorbers handle compat in their fuse method).

The legacy _get_fused_all_to_all_operator helper is removed; the
fuse_with_upstream_map_transformer method on AllToAllOperator is the
only place that knows the AllToAllOperator construction details.

No behavior change for current users (AllToAllOperator is the only
absorber in master). The interface is the seam future sink ops can use
to opt into fusion without modifying this rule.

Signed-off-by: Yuanzhuo Yang <yuanzhuoyang@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant