Skip to content

Commit

Permalink
[SPARK-45283][CORE][TESTS] Make StatusTrackerSuite less fragile
Browse files Browse the repository at this point in the history
### Why are the changes needed?

It's discovered from [Github Actions](https://github.com/xiongbo-sjtu/spark/actions/runs/6270601155/job/17028788767) that StatusTrackerSuite can run into random failures, as shown by the following error message.  The proposed fix is to update the unit test to remove the nondeterministic behavior.

```
    [info] StatusTrackerSuite:
    [info] - basic status API usage (99 milliseconds)
    [info] - getJobIdsForGroup() (56 milliseconds)
    [info] - getJobIdsForGroup() with takeAsync() (48 milliseconds)
    [info] - getJobIdsForGroup() with takeAsync() across multiple partitions (58 milliseconds)
    [info] - getJobIdsForTag() *** FAILED *** (10 seconds, 77 milliseconds)
    [info] The code passed to eventually never returned normally.
           Attempted 651 times over 10.005059944000001 seconds.
           Last failure message: Set(3, 2, 1) was not equal to Set(1, 2). (StatusTrackerSuite.scala:148)
```

Full trace can be found [here](https://issues.apache.org/jira/browse/SPARK-45283).

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
```
build/mvn package -DskipTests -pl core
build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.StatusTrackerSuite test
```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43194 from xiongbo-sjtu/master.

Authored-by: Bo Xiong <xiongbo@amazon.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
  • Loading branch information
Bo Xiong authored and Mridul Muralidharan committed Oct 4, 2023
1 parent 4f09b6c commit ee2eeb7
Showing 1 changed file with 6 additions and 3 deletions.
9 changes: 6 additions & 3 deletions core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,19 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont
}

sc.removeJobTag("tag1")

// takeAsync() across multiple partitions
val thirdJobFuture = sc.parallelize(1 to 1000, 2).takeAsync(999)
val thirdJobId = eventually(timeout(10.seconds)) {
thirdJobFuture.jobIds.head
val thirdJobIds = eventually(timeout(10.seconds)) {
// Wait for the two jobs triggered by takeAsync
thirdJobFuture.jobIds.size should be(2)
thirdJobFuture.jobIds
}
eventually(timeout(10.seconds)) {
sc.statusTracker.getJobIdsForTag("tag1").toSet should be (
Set(firstJobId, secondJobId))
sc.statusTracker.getJobIdsForTag("tag2").toSet should be (
Set(secondJobId, thirdJobId))
Set(secondJobId) ++ thirdJobIds)
}
}
}

0 comments on commit ee2eeb7

Please sign in to comment.