From 79102612ce1422bce527251dc813927eead84943 Mon Sep 17 00:00:00 2001 From: menishmueli Date: Tue, 17 Dec 2024 09:27:58 +0200 Subject: [PATCH] add datafusion comet support --- spark-plugin/build.sbt | 1 + .../example/DataFusionCometExample.scala | 50 +++++++++++++++++++ .../src/reducers/PlanParsers/FilterParser.ts | 2 + spark-ui/src/reducers/SqlReducer.ts | 6 +++ spark-ui/src/reducers/SqlReducerUtils.ts | 20 +++++++- 5 files changed, 78 insertions(+), 1 deletion(-) create mode 100644 spark-plugin/example_3_5_1/src/main/scala/io/dataflint/example/DataFusionCometExample.scala diff --git a/spark-plugin/build.sbt b/spark-plugin/build.sbt index dc7f18e..43433be 100644 --- a/spark-plugin/build.sbt +++ b/spark-plugin/build.sbt @@ -97,6 +97,7 @@ lazy val example_3_5_1 = (project in file("example_3_5_1")) libraryDependencies += "org.apache.iceberg" %% "iceberg-spark-runtime-3.5" % "1.5.0", libraryDependencies += "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0", libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.17", + libraryDependencies += "org.apache.datafusion" % "comet-spark-spark3.5_2.12" % "0.4.0", publish / skip := true ).dependsOn(plugin) diff --git a/spark-plugin/example_3_5_1/src/main/scala/io/dataflint/example/DataFusionCometExample.scala b/spark-plugin/example_3_5_1/src/main/scala/io/dataflint/example/DataFusionCometExample.scala new file mode 100644 index 0000000..0823dbd --- /dev/null +++ b/spark-plugin/example_3_5_1/src/main/scala/io/dataflint/example/DataFusionCometExample.scala @@ -0,0 +1,50 @@ +package io.dataflint.example + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.functions._ + +object DataFusionCometExample extends App { + def df(spark: SparkSession): DataFrame = spark.read + .format("csv") + .option("sep", ";") + .option("inferSchema", true) + .load("./test_data/will_play_text.csv") + .toDF("line_id", "play_name", "speech_number", "line_number", "speaker", "text_entry") + .repartition(1000) + + val spark = SparkSession + .builder() + .appName("DataFusionCometExample") + .config("spark.plugins", "io.dataflint.spark.SparkDataflintPlugin,org.apache.spark.CometPlugin") + .config("spark.shuffle.manager", "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") + .config("spark.comet.explainFallback.enabled", "true") + .config("spark.memory.offHeap.enabled", "true") + .config("spark.memory.offHeap.size", "16g") + .config("spark.ui.port", "10000") + .config("spark.dataflint.telemetry.enabled", value = false) + .config("spark.sql.maxMetadataStringLength", "10000") + .master("local[*]") + .getOrCreate() + + import spark.implicits._ + + val shakespeareText = df(spark) + + shakespeareText.printSchema() + + val count = shakespeareText.count() + println(s"number of records : $count") + + val uniqueSpeakers = shakespeareText.select($"speaker").filter($"line_id".isNotNull).distinct().count() + println(s"number of unique speakers : $uniqueSpeakers") + + val uniqueWords = shakespeareText.select(explode(split($"text_entry", " "))).distinct().count() + + println(s"number of unique words : $uniqueWords") + + + spark.read.load("/Users/menishmueli/Documents/GitHub/spark-sql-perf/data/store_sales").filter($"ss_quantity" > 1).count() + + scala.io.StdIn.readLine() + spark.stop() +} diff --git a/spark-ui/src/reducers/PlanParsers/FilterParser.ts b/spark-ui/src/reducers/PlanParsers/FilterParser.ts index d7af6d6..f0efa3a 100644 --- a/spark-ui/src/reducers/PlanParsers/FilterParser.ts +++ b/spark-ui/src/reducers/PlanParsers/FilterParser.ts @@ -10,6 +10,8 @@ export function parseFilter(input: string): ParseFilterPlan { filterStr = removeFromStart(filterStr, "Filter "); filterStr = removeFromStart(filterStr, "PhotonFilter "); filterStr = removeFromStart(filterStr, "GpuFilter "); + filterStr = removeFromStart(filterStr, "CometFilter "); + if (filterStr.startsWith("(")) { filterStr = removeFromStart(filterStr, "("); filterStr = removeFromEnd(filterStr, ")"); diff --git a/spark-ui/src/reducers/SqlReducer.ts b/spark-ui/src/reducers/SqlReducer.ts index 5fb040f..1931f2b 100644 --- a/spark-ui/src/reducers/SqlReducer.ts +++ b/spark-ui/src/reducers/SqlReducer.ts @@ -94,6 +94,7 @@ export function parseNodePlan( switch (node.nodeName) { case "PhotonGroupingAgg": case "GpuHashAggregate": + case "!CometGpuHashAggregate": case "HashAggregate": return { type: "HashAggregate", @@ -117,12 +118,15 @@ export function parseNodePlan( }; case "PhotonFilter": case "GpuFilter": + case "CometFilter": case "Filter": return { type: "Filter", plan: parseFilter(plan.planDescription), }; case "Exchange": + case "CometExchange": + case "CometColumnarExchange": case "GpuColumnarExchange": return { type: "Exchange", @@ -130,12 +134,14 @@ export function parseNodePlan( }; case "PhotonProject": case "GpuProject": + case "CometFilter": case "Project": return { type: "Project", plan: parseProject(plan.planDescription), }; case "GpuSort": + case "CometSort": case "Sort": return { type: "Sort", diff --git a/spark-ui/src/reducers/SqlReducerUtils.ts b/spark-ui/src/reducers/SqlReducerUtils.ts index 0e02d7b..1b37f61 100644 --- a/spark-ui/src/reducers/SqlReducerUtils.ts +++ b/spark-ui/src/reducers/SqlReducerUtils.ts @@ -125,6 +125,15 @@ const nodeTypeDict: Record = { GpuSort: "sort", GpuShuffledSymmetricHashJoin: "join", GpuBroadcastNestedLoopJoin: "join", + CometColumnarExchange: "shuffle", + CometHashAggregate: "transformation", + CometExchange: "shuffle", + CometProject: "transformation", + CometFilter: "transformation", + CometSort: "sort", + CometHashJoin: "join", + CometBroadcastHashJoin: "join", + CometSortMergeJoin: "join", }; const nodeRenamerDict: Record = { @@ -173,6 +182,15 @@ const nodeRenamerDict: Record = { GpuSort: "Sort (RAPIDS)", GpuShuffledSymmetricHashJoin: "Join (Shuffled Symmetric Hash) (RAPIDS)", GpuBroadcastNestedLoopJoin: "Join (Broadcast Nested Loop) (RAPIDS)", + CometProject: "Project (Comet)", + CometHashAggregate: "Aggregate (Comet)", + CometExchange: "Exchange (Comet)", + CometColumnarExchange: "Columnar Exchange (Comet)", + CometFilter: "Filter (Comet)", + CometSort: "Sort (Comet)", + CometHashJoin: "Join (Comet)", + CometBroadcastHashJoin: "Join (Broadcast Hash) (Comet)", + CometSortMergeJoin: "Join (Sort Merge) (Comet)", }; export function extractTotalFromStatisticsMetric( @@ -228,7 +246,7 @@ export function nodeEnrichedNameBuilder( } else if (plan.plan.type === "SinglePartition") { return "Repartition To Single Partition"; } else if (plan.plan.type === "RoundRobinPartitioning") { - return "Repartition By Round Robin"; + return "Repartition By Round Robin (Comet)"; } } }