Skip to content

Incremental GROUP BY _block_num and DISTINCT BY _block_num#1877

Open
leoyvens wants to merge 19 commits intomainfrom
incremental-distinct-on
Open

Incremental GROUP BY _block_num and DISTINCT BY _block_num#1877
leoyvens wants to merge 19 commits intomainfrom
incremental-distinct-on

Conversation

@leoyvens
Copy link
Collaborator

This implements two related features:

  1. Streaming DISTINCT ON and GROUP BY, when the key is _block_num
  2. Using function call syntax block_num() to more easily refer to the block number anywhere in the query.

Our execution semantics essentially already support the special case where an aggregation is restricted to within a block, thanks to the assumption that data for a same block never spans more than a single microbatch. So executing the aggregation in isolation on each microbatch yields correct results. The necessary changes were around incremental query validation checks and block number propagation.

@leoyvens leoyvens requested a review from Theodus February 27, 2026 14:49
@leoyvens leoyvens marked this pull request as draft February 27, 2026 14:49
@leoyvens leoyvens marked this pull request as ready for review February 27, 2026 15:00
Copy link
Contributor

@LNSD LNSD left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, check my comments 🙂

pub use datafusion::{arrow, parquet};
pub use datasets_common::{block_num::BlockNum, block_range::BlockRange, end_block::EndBlock};

pub mod block_num_udf;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename the module to just block_num? maybe we can move all the UDFs under common::udfs (e.g., common::udfs::evm::* or common::udfs::block_num)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed

@leoyvens leoyvens force-pushed the incremental-distinct-on branch 3 times, most recently from f9ae9a5 to ed06c86 Compare March 3, 2026 19:45
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant to move common::block_num to common::udfs::block_num (from crates/core/common/src/block_num.rs to crates/core/common/src/udfs/block_num.rs)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@LNSD
Copy link
Contributor

LNSD commented Mar 4, 2026

The CI failure is related to the changes: https://github.com/edgeandnode/amp/actions/runs/22639732283/job/65612193391?pr=1877

@leoyvens leoyvens force-pushed the incremental-distinct-on branch 2 times, most recently from e73514b to 3948177 Compare March 5, 2026 11:44
@LNSD
Copy link
Contributor

LNSD commented Mar 5, 2026

@leoyvens, if this is still WIP, could you mark the PR as a draft?

@leoyvens leoyvens marked this pull request as draft March 5, 2026 12:01
@leoyvens
Copy link
Collaborator Author

leoyvens commented Mar 5, 2026

@LNSD yes CI is failing for legit reasons, marked as draft

@LNSD
Copy link
Contributor

LNSD commented Mar 5, 2026

@LNSD yes CI is failing for legit reasons, marked as draft

Thanks. The issue was that I was flooded with notifications to review the PR, and it wasn't ready. Using the "draft mode" can help better signal when someone wants the PR to be reviewed.

@leoyvens leoyvens marked this pull request as ready for review March 5, 2026 13:34
@leoyvens leoyvens force-pushed the incremental-distinct-on branch from 64f28c7 to a0ab4ad Compare March 5, 2026 13:40
@leoyvens
Copy link
Collaborator Author

leoyvens commented Mar 5, 2026

@Theodus This now ready for review

leoyvens and others added 12 commits March 6, 2026 17:48
Signed-off-by: Leonardo Yvens <leoyvens@gmail.com>
Allow GROUP BY queries that include _block_num as a group key to work with
incremental processing instead of being rejected.

- Handle Aggregate in BlockNumPropagator by setting next_block_num_expr
- Remove Aggregate from the unsupported-node error arm

Signed-off-by: Leonardo Yvens <leoyvens@gmail.com>
Signed-off-by: Leo <leo@edgeandnode.com>
…gation

Adds a `block_num()` sentinel UDF that lets users explicitly request the
propagated `_block_num` value in projections and DISTINCT ON expressions,
particularly in join contexts where the bare `_block_num` column would be
ambiguous.

Key changes:
- Register BlockNumUdf in builtin_udfs() in session_state
- BlockNumPropagator now replaces block_num() UDF with the correct
  greatest(left._block_num, right._block_num) expression from the join
- forbid_underscore_prefixed_aliases enhanced to check all node types (not
  just Projection) and to reject bare _block_num in multi-table projections
- incremental_op_kind uses expr_outputs_block_num for Aggregate/Distinct::On
  first-key checks, accepting post-propagation expressions derived from _block_num
@leoyvens leoyvens force-pushed the incremental-distinct-on branch from a0ab4ad to bdca7e3 Compare March 6, 2026 17:58
@leoyvens leoyvens force-pushed the incremental-distinct-on branch from bdca7e3 to 85f869c Compare March 6, 2026 18:23
Copy link
Member

@Theodus Theodus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants