Skip to content

Commit

Permalink
[SPARK-45513][CORE][SQL][MLLIB][CONNECT] Replace `scala.runtime.Tuple…
Browse files Browse the repository at this point in the history
…2Zipped` to `scala.collection.LazyZip2`

### What changes were proposed in this pull request?
Since scala 2.13.0, `scala.runtime.Tuple2Zipped` marked as deprecated and `scala.collection.LazyZip2` recommended.

### Why are the changes needed?
Replace `scala.runtime.Tuple2Zipped` to `scala.collection.LazyZip2`

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
Exist test cases.

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #43351 from beliefer/SPARK-45513.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Jiaan Geng <beliefer@163.com>
  • Loading branch information
beliefer committed Oct 14, 2023
1 parent d4ceedd commit 6f46ea2
Show file tree
Hide file tree
Showing 12 changed files with 45 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ trait SQLHelper {
None
}
}
(keys, values).zipped.foreach { (k, v) =>
keys.lazyZip(values).foreach { (k, v) =>
if (spark.conf.isModifiable(k)) {
spark.conf.set(k, v)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
8000L, 5000L, 7000L, 4000L, 6000L, 3000L, 10L, 90L, 2L, 20L, 110L)

def max(a: Array[Long], b: Array[Long]): Array[Long] =
(a, b).zipped.map(Math.max).toArray
a.lazyZip(b).map(Math.max).toArray

// calculated metric peaks per stage per executor
// metrics sent during stage 0 for each executor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class ShuffleBlockPusherSuite extends SparkFunSuite {
pushedBlocks ++= blocks
val managedBuffers = invocation.getArguments()(3).asInstanceOf[Array[ManagedBuffer]]
val blockPushListener = invocation.getArguments()(4).asInstanceOf[BlockPushingListener]
(blocks, managedBuffers).zipped.foreach((blockId, buffer) => {
blocks.lazyZip(managedBuffers).foreach((blockId, buffer) => {
blockPushListener.onBlockPushSuccess(blockId, buffer)
})
})
Expand All @@ -91,7 +91,7 @@ class ShuffleBlockPusherSuite extends SparkFunSuite {
private def verifyPushRequests(
pushRequests: Seq[PushRequest],
expectedSizes: Seq[Int]): Unit = {
(pushRequests, expectedSizes).zipped.foreach((req, size) => {
pushRequests.lazyZip(expectedSizes).foreach((req, size) => {
assert(req.size == size)
})
}
Expand Down Expand Up @@ -256,7 +256,7 @@ class ShuffleBlockPusherSuite extends SparkFunSuite {
// blocks to be deferred
blockPushListener.onBlockPushSuccess(blocks(0), managedBuffers(0))
} else {
(blocks, managedBuffers).zipped.foreach((blockId, buffer) => {
blocks.lazyZip(managedBuffers).foreach((blockId, buffer) => {
blockPushListener.onBlockPushSuccess(blockId, buffer)
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class ElementwiseProductSuite extends SparkFunSuite with MLlibTestSparkContext {
case _ => false
}, "The vector type should be preserved after hadamard product")

assert((data2, data2RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert(data2.lazyZip(data2RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert(data2(0) ~== Vectors.sparse(3, Seq((1, 0.0), (2, -1.5))) absTol 1E-5)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class NormalizerSuite extends SparkFunSuite with MLlibTestSparkContext {
case _ => false
}, "The vector type should be preserved after normalization.")

assert((data1, data1RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert(data1.lazyZip(data1RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))

assert(brzNorm(data1(0).asBreeze, 1) ~== 1.0 absTol 1E-5)
assert(brzNorm(data1(2).asBreeze, 1) ~== 1.0 absTol 1E-5)
Expand All @@ -76,7 +76,7 @@ class NormalizerSuite extends SparkFunSuite with MLlibTestSparkContext {
case _ => false
}, "The vector type should be preserved after normalization.")

assert((data2, data2RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert(data2.lazyZip(data2RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))

assert(brzNorm(data2(0).asBreeze, 2) ~== 1.0 absTol 1E-5)
assert(brzNorm(data2(2).asBreeze, 2) ~== 1.0 absTol 1E-5)
Expand All @@ -103,7 +103,7 @@ class NormalizerSuite extends SparkFunSuite with MLlibTestSparkContext {
case _ => false
}, "The vector type should be preserved after normalization.")

assert((dataInf, dataInfRDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert(dataInf.lazyZip(dataInfRDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))

assert(dataInf(0).toArray.map(math.abs).max ~== 1.0 absTol 1E-5)
assert(dataInf(2).toArray.map(math.abs).max ~== 1.0 absTol 1E-5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ class StandardScalerSuite extends SparkFunSuite with MLlibTestSparkContext {
case _ => false
}, "The vector type should be preserved after standardization.")

assert((data1, data1RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert((data2, data2RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert((data3, data3RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert(data1.lazyZip(data1RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert(data2.lazyZip(data2RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert(data3.lazyZip(data3RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))

assert(summary1.mean ~== Vectors.dense(0.0, 0.0, 0.0) absTol 1E-5)
assert(summary1.variance ~== Vectors.dense(1.0, 1.0, 1.0) absTol 1E-5)
Expand Down Expand Up @@ -169,9 +169,9 @@ class StandardScalerSuite extends SparkFunSuite with MLlibTestSparkContext {
case _ => false
}, "The vector type should be preserved after standardization.")

assert((data1, data1RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert((data2, data2RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert((data3, data3RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert(data1.lazyZip(data1RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert(data2.lazyZip(data2RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert(data3.lazyZip(data3RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))

assert(summary1.mean ~== Vectors.dense(0.0, 0.0, 0.0) absTol 1E-5)
assert(summary1.variance ~== Vectors.dense(1.0, 1.0, 1.0) absTol 1E-5)
Expand Down Expand Up @@ -225,9 +225,9 @@ class StandardScalerSuite extends SparkFunSuite with MLlibTestSparkContext {
case _ => false
}, "The vector type should be preserved after standardization.")

assert((data1, data1RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert((data2, data2RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert((data3, data3RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert(data1.lazyZip(data1RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert(data2.lazyZip(data2RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert(data3.lazyZip(data3RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))

assert(summary1.mean ~== Vectors.dense(0.0, 0.0, 0.0) absTol 1E-5)
assert(summary1.variance ~== Vectors.dense(1.0, 1.0, 1.0) absTol 1E-5)
Expand Down Expand Up @@ -274,9 +274,9 @@ class StandardScalerSuite extends SparkFunSuite with MLlibTestSparkContext {
case _ => false
}, "The vector type should be preserved after standardization.")

assert((data1, data1RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert((data2, data2RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert((data3, data3RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert(data1.lazyZip(data1RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert(data2.lazyZip(data2RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert(data3.lazyZip(data3RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))

assert(summary1.mean ~== Vectors.dense(0.0, 0.0, 0.0) absTol 1E-5)
assert(summary1.variance ~== Vectors.dense(1.0, 1.0, 1.0) absTol 1E-5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class LBFGSSuite extends SparkFunSuite with MLlibTestSparkContext with Matchers
// Since the cost function is convex, the loss is guaranteed to be monotonically decreasing
// with L-BFGS optimizer.
// (SGD doesn't guarantee this, and the loss will be fluctuating in the optimization process.)
assert((loss, loss.tail).zipped.forall(_ > _), "loss should be monotonically decreasing.")
assert(loss.lazyZip(loss.tail).forall(_ > _), "loss should be monotonically decreasing.")

val stepSize = 1.0
// Well, GD converges slower, so it requires more iterations!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
case join @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, _, _, left, right, hint) =>
var newLeft = left
var newRight = right
(leftKeys, rightKeys).zipped.foreach((l, r) => {
leftKeys.lazyZip(rightKeys).foreach((l, r) => {
// Check if:
// 1. There is already a DPP filter on the key
// 2. There is already a runtime filter (Bloom filter or IN subquery) on the key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ trait SQLHelper {
None
}
}
(keys, values).zipped.foreach { (k, v) =>
keys.lazyZip(values).foreach { (k, v) =>
if (SQLConf.isStaticConfigKey(k)) {
throw new AnalysisException(s"Cannot modify the value of a static config: $k")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class IntegralDeltaSuite extends SparkFunSuite {
val deltas = if (input.isEmpty) {
Seq.empty[Long]
} else {
(input.tail, input.init).zipped.map {
input.tail.lazyZip(input.init).map {
case (x: Int, y: Int) => (x - y).toLong
case (x: Long, y: Long) => x - y
case other => fail(s"Unexpected input $other")
Expand Down Expand Up @@ -80,7 +80,7 @@ class IntegralDeltaSuite extends SparkFunSuite {
assertResult(Byte.MinValue, "The first byte should be an escaping mark")(buffer.get())
assertResult(input.head, "The first value is wrong")(columnType.extract(buffer))

(input.tail, deltas).zipped.foreach { (value, delta) =>
input.tail.lazyZip(deltas).foreach { (value, delta) =>
if (math.abs(delta) <= Byte.MaxValue) {
assertResult(delta, "Wrong delta")(buffer.get())
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
val fromCaseClassString = StructType.fromString(caseClassString)
val fromJson = StructType.fromString(jsonString)

(fromCaseClassString, fromJson).zipped.foreach { (a, b) =>
fromCaseClassString.lazyZip(fromJson).foreach { (a, b) =>
assert(a.name == b.name)
assert(a.dataType === b.dataType)
assert(a.nullable === b.nullable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,9 @@ class HashedRelationSuite extends SharedSparkSession {

test("LongToUnsafeRowMap: key set iterator on a contiguous array of keys") {
val rowMap = new LongToUnsafeRowMap(mm, 1)
(contiguousArray, contiguousRows).zipped.map { (i, row) => rowMap.append(i, row) }
contiguousArray.toArray.lazyZip(contiguousRows).map {
(i, row) => rowMap.append(i, row)
}
var keyIterator = rowMap.keys()
// in sparse mode the keys are unsorted
assert(keyIterator.map(key => key.getLong(0)).toArray.sortWith(_ < _) === contiguousArray)
Expand All @@ -479,7 +481,9 @@ class HashedRelationSuite extends SharedSparkSession {

test("LongToUnsafeRowMap: key set iterator on a sparse array with equidistant keys") {
val rowMap = new LongToUnsafeRowMap(mm, 1)
(sparseArray, sparseRows).zipped.map { (i, row) => rowMap.append(i, row) }
sparseArray.toArray.lazyZip(sparseRows).map {
(i, row) => rowMap.append(i, row)
}
var keyIterator = rowMap.keys()
assert(keyIterator.map(_.getLong(0)).toArray.sortWith(_ < _) === sparseArray)
rowMap.optimize()
Expand All @@ -503,7 +507,9 @@ class HashedRelationSuite extends SharedSparkSession {

test("LongToUnsafeRowMap: multiple hasNext calls before calling next() on the key iterator") {
val rowMap = new LongToUnsafeRowMap(mm, 1)
(randomArray, randomRows).zipped.map { (i, row) => rowMap.append(i, row) }
randomArray.toArray.lazyZip(randomRows).map {
(i, row) => rowMap.append(i, row)
}
val buffer = new ArrayBuffer[Long]()
// hasNext should not change the cursor unless the key was read by a next() call
var keyIterator = rowMap.keys()
Expand All @@ -527,7 +533,9 @@ class HashedRelationSuite extends SharedSparkSession {

test("LongToUnsafeRowMap: no explicit hasNext calls on the key iterator") {
val rowMap = new LongToUnsafeRowMap(mm, 1)
(randomArray, randomRows).zipped.map { (i, row) => rowMap.append(i, row) }
randomArray.toArray.lazyZip(randomRows).map {
(i, row) => rowMap.append(i, row)
}
val buffer = new ArrayBuffer[Long]()
// call next() until the buffer is filled with all keys
var keyIterator = rowMap.keys()
Expand Down Expand Up @@ -555,7 +563,9 @@ class HashedRelationSuite extends SharedSparkSession {

test("LongToUnsafeRowMap: call hasNext at the end of the iterator") {
val rowMap = new LongToUnsafeRowMap(mm, 1)
(sparseArray, sparseRows).zipped.map { (i, row) => rowMap.append(i, row) }
sparseArray.toArray.lazyZip(sparseRows).map {
(i, row) => rowMap.append(i, row)
}
var keyIterator = rowMap.keys()
assert(keyIterator.map(key => key.getLong(0)).toArray.sortWith(_ < _) === sparseArray)
assert(keyIterator.hasNext == false)
Expand All @@ -570,7 +580,9 @@ class HashedRelationSuite extends SharedSparkSession {

test("LongToUnsafeRowMap: random sequence of hasNext and next() calls on the key iterator") {
val rowMap = new LongToUnsafeRowMap(mm, 1)
(randomArray, randomRows).zipped.map { (i, row) => rowMap.append(i, row) }
randomArray.toArray.lazyZip(randomRows).map {
(i, row) => rowMap.append(i, row)
}
val buffer = new ArrayBuffer[Long]()
// call hasNext or next() at random
var keyIterator = rowMap.keys()
Expand Down

0 comments on commit 6f46ea2

Please sign in to comment.