Skip to content

Commit

Permalink
[SPARK-48155][FOLLOWUP][SQL] AQEPropagateEmptyRelation for left anti …
Browse files Browse the repository at this point in the history
…join should check if remain child is just BroadcastQueryStageExec

### What changes were proposed in this pull request?
As title.

### Why are the changes needed?

We encountered BroadcastNestedLoopJoin LeftAnti BuildLeft, and it's right is empty. It is left child of left outer BroadcastHashJoin. The case is more complicated, part of the Initial Plan is as follows
```
               :- Project (214)
               :  +- BroadcastHashJoin LeftOuter BuildRight (213)
               :     :- BroadcastNestedLoopJoin LeftAnti BuildLeft (211)
               :     :  :- BroadcastExchange (187)
               :     :  :  +- Project (186)
               :     :  :     +- Filter (185)
               :     :  :        +- Scan parquet  (31)
               :     :  +- LocalLimit (210)
               :     :     +- Project (209)
               :     :        +- BroadcastHashJoin Inner BuildLeft (208)
               :     :           :- BroadcastExchange (194)
               :     :           :  +- Project (193)
               :     :           :     +- BroadcastHashJoin LeftOuter BuildRight (192)
               :     :           :        :- Project (189)
               :     :           :        :  +- Filter (188)
               :     :           :        :     +- Scan parquet  (37)
               :     :           :        +- BroadcastExchange (191)
```
After AQEPropagateEmptyRelation, report an error "HashJoin should not take LeftOuter as the JoinType with building left side"

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #48300 from zml1206/SPARK-48155-followup.

Authored-by: zml1206 <zhuml1206@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
zml1206 authored and cloud-fan committed Oct 16, 2024
1 parent 60200ae commit f860af6
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup
// Except is handled as LeftAnti by `ReplaceExceptWithAntiJoin` rule.
case LeftOuter | LeftSemi | LeftAnti if isLeftEmpty => empty(p)
case LeftSemi if isRightEmpty | isFalseCondition => empty(p)
case LeftAnti if isRightEmpty | isFalseCondition => p.left
case LeftAnti if (isRightEmpty | isFalseCondition) && canExecuteWithoutJoin(p.left) =>
p.left
case FullOuter if isLeftEmpty && isRightEmpty => empty(p)
case LeftOuter | FullOuter if isRightEmpty && canExecuteWithoutJoin(p.left) =>
Project(p.left.output ++ nullValueProjectList(p.right), p.left)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2829,6 +2829,38 @@ class AdaptiveQueryExecSuite
assert(findTopLevelBroadcastNestedLoopJoin(adaptivePlan).size == 1)
assert(findTopLevelUnion(adaptivePlan).size == 0)
}

withSQLConf(
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "100") {
withTempView("t1", "t2", "t3", "t4") {
Seq(1).toDF().createOrReplaceTempView("t1")
spark.range(100).createOrReplaceTempView("t2")
spark.range(2).createOrReplaceTempView("t3")
spark.range(2).createOrReplaceTempView("t4")
val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
"""
|SELECT tt2.value
|FROM (
| SELECT value
| FROM t1
| WHERE NOT EXISTS (
| SELECT 1
| FROM (
| SELECT t2.id
| FROM t2
| JOIN t3 ON t2.id = t3.id
| AND t2.id > 100
| ) tt
| WHERE t1.value = tt.id
| )
| AND t1.value = 1
|) tt2
| LEFT JOIN t4 ON tt2.value = t4.id
|""".stripMargin
)
assert(findTopLevelBroadcastNestedLoopJoin(adaptivePlan).size == 1)
}
}
}

test("SPARK-39915: Dataset.repartition(N) may not create N partitions") {
Expand Down

0 comments on commit f860af6

Please sign in to comment.