Skip to content

Commit

Permalink
add datafusion comet support
Browse files Browse the repository at this point in the history
  • Loading branch information
menishmueli committed Dec 17, 2024
1 parent a0dc9c6 commit 7910261
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 1 deletion.
1 change: 1 addition & 0 deletions spark-plugin/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ lazy val example_3_5_1 = (project in file("example_3_5_1"))
libraryDependencies += "org.apache.iceberg" %% "iceberg-spark-runtime-3.5" % "1.5.0",
libraryDependencies += "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0",
libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.17",
libraryDependencies += "org.apache.datafusion" % "comet-spark-spark3.5_2.12" % "0.4.0",
publish / skip := true
).dependsOn(plugin)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.dataflint.example

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

object DataFusionCometExample extends App {
def df(spark: SparkSession): DataFrame = spark.read
.format("csv")
.option("sep", ";")
.option("inferSchema", true)
.load("./test_data/will_play_text.csv")
.toDF("line_id", "play_name", "speech_number", "line_number", "speaker", "text_entry")
.repartition(1000)

val spark = SparkSession
.builder()
.appName("DataFusionCometExample")
.config("spark.plugins", "io.dataflint.spark.SparkDataflintPlugin,org.apache.spark.CometPlugin")
.config("spark.shuffle.manager", "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
.config("spark.comet.explainFallback.enabled", "true")
.config("spark.memory.offHeap.enabled", "true")
.config("spark.memory.offHeap.size", "16g")
.config("spark.ui.port", "10000")
.config("spark.dataflint.telemetry.enabled", value = false)
.config("spark.sql.maxMetadataStringLength", "10000")
.master("local[*]")
.getOrCreate()

import spark.implicits._

val shakespeareText = df(spark)

shakespeareText.printSchema()

val count = shakespeareText.count()
println(s"number of records : $count")

val uniqueSpeakers = shakespeareText.select($"speaker").filter($"line_id".isNotNull).distinct().count()
println(s"number of unique speakers : $uniqueSpeakers")

val uniqueWords = shakespeareText.select(explode(split($"text_entry", " "))).distinct().count()

println(s"number of unique words : $uniqueWords")


spark.read.load("/Users/menishmueli/Documents/GitHub/spark-sql-perf/data/store_sales").filter($"ss_quantity" > 1).count()

scala.io.StdIn.readLine()
spark.stop()
}
2 changes: 2 additions & 0 deletions spark-ui/src/reducers/PlanParsers/FilterParser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ export function parseFilter(input: string): ParseFilterPlan {
filterStr = removeFromStart(filterStr, "Filter ");
filterStr = removeFromStart(filterStr, "PhotonFilter ");
filterStr = removeFromStart(filterStr, "GpuFilter ");
filterStr = removeFromStart(filterStr, "CometFilter ");

if (filterStr.startsWith("(")) {
filterStr = removeFromStart(filterStr, "(");
filterStr = removeFromEnd(filterStr, ")");
Expand Down
6 changes: 6 additions & 0 deletions spark-ui/src/reducers/SqlReducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ export function parseNodePlan(
switch (node.nodeName) {
case "PhotonGroupingAgg":
case "GpuHashAggregate":
case "!CometGpuHashAggregate":
case "HashAggregate":
return {
type: "HashAggregate",
Expand All @@ -117,25 +118,30 @@ export function parseNodePlan(
};
case "PhotonFilter":
case "GpuFilter":
case "CometFilter":
case "Filter":
return {
type: "Filter",
plan: parseFilter(plan.planDescription),
};
case "Exchange":
case "CometExchange":
case "CometColumnarExchange":
case "GpuColumnarExchange":
return {
type: "Exchange",
plan: parseExchange(plan.planDescription),
};
case "PhotonProject":
case "GpuProject":
case "CometFilter":
case "Project":
return {
type: "Project",
plan: parseProject(plan.planDescription),
};
case "GpuSort":
case "CometSort":
case "Sort":
return {
type: "Sort",
Expand Down
20 changes: 19 additions & 1 deletion spark-ui/src/reducers/SqlReducerUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,15 @@ const nodeTypeDict: Record<string, NodeType> = {
GpuSort: "sort",
GpuShuffledSymmetricHashJoin: "join",
GpuBroadcastNestedLoopJoin: "join",
CometColumnarExchange: "shuffle",
CometHashAggregate: "transformation",
CometExchange: "shuffle",
CometProject: "transformation",
CometFilter: "transformation",
CometSort: "sort",
CometHashJoin: "join",
CometBroadcastHashJoin: "join",
CometSortMergeJoin: "join",
};

const nodeRenamerDict: Record<string, string> = {
Expand Down Expand Up @@ -173,6 +182,15 @@ const nodeRenamerDict: Record<string, string> = {
GpuSort: "Sort (RAPIDS)",
GpuShuffledSymmetricHashJoin: "Join (Shuffled Symmetric Hash) (RAPIDS)",
GpuBroadcastNestedLoopJoin: "Join (Broadcast Nested Loop) (RAPIDS)",
CometProject: "Project (Comet)",
CometHashAggregate: "Aggregate (Comet)",
CometExchange: "Exchange (Comet)",
CometColumnarExchange: "Columnar Exchange (Comet)",
CometFilter: "Filter (Comet)",
CometSort: "Sort (Comet)",
CometHashJoin: "Join (Comet)",
CometBroadcastHashJoin: "Join (Broadcast Hash) (Comet)",
CometSortMergeJoin: "Join (Sort Merge) (Comet)",
};

export function extractTotalFromStatisticsMetric(
Expand Down Expand Up @@ -228,7 +246,7 @@ export function nodeEnrichedNameBuilder(
} else if (plan.plan.type === "SinglePartition") {
return "Repartition To Single Partition";
} else if (plan.plan.type === "RoundRobinPartitioning") {
return "Repartition By Round Robin";
return "Repartition By Round Robin (Comet)";
}
}
}
Expand Down

0 comments on commit 7910261

Please sign in to comment.