Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1519,7 +1519,9 @@ impl ExecutionPlan for AggregateExec {

// If this node tried to pushdown some dynamic filter before, now we check
// if the child accept the filter
if matches!(phase, FilterPushdownPhase::Post) && self.dynamic_filter.is_some() {
if matches!(phase, FilterPushdownPhase::Post)
&& let Some(dyn_filter) = &self.dynamic_filter
{
// let child_accepts_dyn_filter = child_pushdown_result
// .self_filters
// .first()
Expand All @@ -1540,7 +1542,6 @@ impl ExecutionPlan for AggregateExec {
// So here, we try to use ref count to determine if the dynamic filter
// has actually be pushed down.
// Issue: <https://github.com/apache/datafusion/issues/18856>
let dyn_filter = self.dynamic_filter.as_ref().unwrap();
let child_accepts_dyn_filter = Arc::strong_count(dyn_filter) > 1;

if !child_accepts_dyn_filter {
Expand Down
8 changes: 5 additions & 3 deletions datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1529,15 +1529,17 @@ impl SortMergeJoinStream {

// Prepare the columns we apply join filter on later.
// Only for joined rows between streamed and buffered.
let filter_columns = if chunk.buffered_batch_idx.is_some() {
let filter_columns = if let Some(buffered_batch_idx) =
chunk.buffered_batch_idx
{
if !matches!(self.join_type, JoinType::Right) {
if matches!(
self.join_type,
JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark
) {
let right_cols = fetch_right_columns_by_idxs(
&self.buffered_data,
chunk.buffered_batch_idx.unwrap(),
buffered_batch_idx,
&right_indices,
)?;

Expand All @@ -1548,7 +1550,7 @@ impl SortMergeJoinStream {
) {
let right_cols = fetch_right_columns_by_idxs(
&self.buffered_data,
chunk.buffered_batch_idx.unwrap(),
buffered_batch_idx,
&right_indices,
)?;

Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@
# to compile this workspace and run CI jobs.

[toolchain]
channel = "1.92.0"
channel = "1.93.0"
components = ["rustfmt", "clippy"]