Skip to content

Commit

Permalink
[SPARK-49434][SPARK-49435][CONNECT][SQL] Move aggregators to sql/api
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR moves all user facing Aggregators from sql/core to sql/api.

### Why are the changes needed?
We are create a unifies Scala SQL interface. This is part of that effort.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing tests.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #48267 from hvanhovell/SPARK-49434.

Authored-by: Herman van Hovell <herman@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
hvanhovell authored and dongjoon-hyun committed Sep 28, 2024
1 parent 47d2c9c commit 8dfecc1
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 33 deletions.
5 changes: 5 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.avro.functions$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.protobuf.functions"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.protobuf.functions$"),

// SPARK-49434: Move aggregators to sql/api
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.expressions.javalang.typed"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.expressions.scalalang.typed"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.expressions.scalalang.typed$"),
) ++ loggingExcludes("org.apache.spark.sql.DataFrameReader") ++
loggingExcludes("org.apache.spark.sql.streaming.DataStreamReader") ++
loggingExcludes("org.apache.spark.sql.SparkSession#Builder")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.TypedColumn;
import org.apache.spark.sql.execution.aggregate.TypedAverage;
import org.apache.spark.sql.execution.aggregate.TypedCount;
import org.apache.spark.sql.execution.aggregate.TypedSumDouble;
import org.apache.spark.sql.execution.aggregate.TypedSumLong;
import org.apache.spark.sql.internal.TypedAverage;
import org.apache.spark.sql.internal.TypedCount;
import org.apache.spark.sql.internal.TypedSumDouble;
import org.apache.spark.sql.internal.TypedSumLong;

/**
* Type-safe functions available for {@link org.apache.spark.sql.Dataset} operations in Java.
* Type-safe functions available for {@link org.apache.spark.sql.api.Dataset} operations in Java.
*
* Scala users should use {@link org.apache.spark.sql.expressions.scalalang.typed}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,17 @@
package org.apache.spark.sql.expressions

import org.apache.spark.SparkException
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveBooleanEncoder, ProductEncoder}
import org.apache.spark.sql.{Encoder, Encoders}

/**
* An aggregator that uses a single associative and commutative reduce function. This reduce
* function can be used to go through all input values and reduces them to a single value.
* If there is no input, a null value is returned.
* function can be used to go through all input values and reduces them to a single value. If
* there is no input, a null value is returned.
*
* This class currently assumes there is at least one input row.
*/
private[sql] class ReduceAggregator[T: Encoder](func: (T, T) => T)
extends Aggregator[T, (Boolean, T), T] {
extends Aggregator[T, (Boolean, T), T] {

@transient private val encoder = implicitly[Encoder[T]]

Expand All @@ -47,10 +45,8 @@ private[sql] class ReduceAggregator[T: Encoder](func: (T, T) => T)

override def zero: (Boolean, T) = (false, _zero.asInstanceOf[T])

override def bufferEncoder: Encoder[(Boolean, T)] = {
ProductEncoder.tuple(Seq(PrimitiveBooleanEncoder, encoder.asInstanceOf[AgnosticEncoder[T]]))
.asInstanceOf[Encoder[(Boolean, T)]]
}
override def bufferEncoder: Encoder[(Boolean, T)] =
Encoders.tuple(Encoders.scalaBoolean, encoder)

override def outputEncoder: Encoder[T] = encoder

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.spark.sql.expressions.scalalang

import org.apache.spark.sql._
import org.apache.spark.sql.execution.aggregate._
import org.apache.spark.sql.TypedColumn
import org.apache.spark.sql.internal.{TypedAverage, TypedCount, TypedSumDouble, TypedSumLong}

/**
* Type-safe functions available for `Dataset` operations in Scala.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,24 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.aggregate
package org.apache.spark.sql.internal

import org.apache.spark.api.java.function.MapFunction
import org.apache.spark.sql.{Encoder, TypedColumn}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{Encoder, Encoders, TypedColumn}
import org.apache.spark.sql.expressions.Aggregator

////////////////////////////////////////////////////////////////////////////////////////////////////
// This file defines internal implementations for aggregators.
////////////////////////////////////////////////////////////////////////////////////////////////////


class TypedSumDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
override def zero: Double = 0.0
override def reduce(b: Double, a: IN): Double = b + f(a)
override def merge(b1: Double, b2: Double): Double = b1 + b2
override def finish(reduction: Double): Double = reduction

override def bufferEncoder: Encoder[Double] = ExpressionEncoder[Double]()
override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]()
override def bufferEncoder: Encoder[Double] = Encoders.scalaDouble
override def outputEncoder: Encoder[Double] = Encoders.scalaDouble

// Java api support
def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x).asInstanceOf[Double])
Expand All @@ -44,15 +42,14 @@ class TypedSumDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Dou
}
}


class TypedSumLong[IN](val f: IN => Long) extends Aggregator[IN, Long, Long] {
override def zero: Long = 0L
override def reduce(b: Long, a: IN): Long = b + f(a)
override def merge(b1: Long, b2: Long): Long = b1 + b2
override def finish(reduction: Long): Long = reduction

override def bufferEncoder: Encoder[Long] = ExpressionEncoder[Long]()
override def outputEncoder: Encoder[Long] = ExpressionEncoder[Long]()
override def bufferEncoder: Encoder[Long] = Encoders.scalaLong
override def outputEncoder: Encoder[Long] = Encoders.scalaLong

// Java api support
def this(f: MapFunction[IN, java.lang.Long]) = this((x: IN) => f.call(x).asInstanceOf[Long])
Expand All @@ -62,7 +59,6 @@ class TypedSumLong[IN](val f: IN => Long) extends Aggregator[IN, Long, Long] {
}
}


class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, Long, Long] {
override def zero: Long = 0
override def reduce(b: Long, a: IN): Long = {
Expand All @@ -71,8 +67,8 @@ class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, Long, Long] {
override def merge(b1: Long, b2: Long): Long = b1 + b2
override def finish(reduction: Long): Long = reduction

override def bufferEncoder: Encoder[Long] = ExpressionEncoder[Long]()
override def outputEncoder: Encoder[Long] = ExpressionEncoder[Long]()
override def bufferEncoder: Encoder[Long] = Encoders.scalaLong
override def outputEncoder: Encoder[Long] = Encoders.scalaLong

// Java api support
def this(f: MapFunction[IN, Object]) = this((x: IN) => f.call(x).asInstanceOf[Any])
Expand All @@ -81,7 +77,6 @@ class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, Long, Long] {
}
}


class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long), Double] {
override def zero: (Double, Long) = (0.0, 0L)
override def reduce(b: (Double, Long), a: IN): (Double, Long) = (f(a) + b._1, 1 + b._2)
Expand All @@ -90,8 +85,10 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
(b1._1 + b2._1, b1._2 + b2._2)
}

override def bufferEncoder: Encoder[(Double, Long)] = ExpressionEncoder[(Double, Long)]()
override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]()
override def bufferEncoder: Encoder[(Double, Long)] =
Encoders.tuple(Encoders.scalaDouble, Encoders.scalaLong)

override def outputEncoder: Encoder[Double] = Encoders.scalaDouble

// Java api support
def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x).asInstanceOf[Double])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ class ColumnNodeToExpressionConverterSuite extends SparkFunSuite {
a.asInstanceOf[AgnosticEncoder[Any]]

test("udf") {
val int2LongSum = new aggregate.TypedSumLong[Int]((i: Int) => i.toLong)
val int2LongSum = new TypedSumLong[Int]((i: Int) => i.toLong)
val bufferEncoder = encoderFor(int2LongSum.bufferEncoder)
val outputEncoder = encoderFor(int2LongSum.outputEncoder)
val bufferAttrs = bufferEncoder.namedExpressions.map {
Expand Down

0 comments on commit 8dfecc1

Please sign in to comment.