Skip to content

Commit

Permalink
add job group extractor
Browse files Browse the repository at this point in the history
  • Loading branch information
menishmueli committed Dec 2, 2024
1 parent f50ad2f commit 3ca6e69
Show file tree
Hide file tree
Showing 6 changed files with 463 additions and 33 deletions.
1 change: 1 addition & 0 deletions spark-plugin/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ lazy val example_3_5_1 = (project in file("example_3_5_1"))
libraryDependencies += "io.delta" %% "delta-spark" % "3.1.0",
libraryDependencies += "org.apache.iceberg" %% "iceberg-spark-runtime-3.5" % "1.5.0",
libraryDependencies += "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0",
libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.17",
publish / skip := true
).dependsOn(plugin)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.dataflint.example

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

object JobGroupExample extends App {
val spark = SparkSession
.builder()
.appName("JobGroupExample")
.config("spark.plugins", "io.dataflint.spark.SparkDataflintPlugin")
.config("spark.ui.port", "10000")
.config("spark.dataflint.telemetry.enabled", value = false)
.config("spark.eventLog.enabled", "true")
.config("spark.sql.maxMetadataStringLength", "10000")
.master("local[*]")
.getOrCreate()

import spark.implicits._

val data = Seq(
("Alice", "Math", 85),
("Alice", "Physics", 95),
("Bob", "Math", 78),
("Bob", "Physics", 88),
("Charlie", "Math", 92),
("Charlie", "Physics", 80)
).toDF("name", "subject", "score")

data.createOrReplaceTempView("student_scores")

// Set up and run the first query with a specific group ID
spark.sparkContext.setJobGroup("queryGroup1", "Group 1: Math Scores")
val mathScores = spark.sql("SELECT name, score FROM student_scores WHERE subject = 'Math'")
mathScores.count()

// Set up and run the second query with a different group ID
spark.sparkContext.setJobGroup("queryGroup2", "Group 2: Average Scores")
val avgScores = spark.sql("SELECT name, AVG(score) as avg_score FROM student_scores GROUP BY name")
avgScores.count()

scala.io.StdIn.readLine()

spark.stop()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.dataflint.example

import org.apache.spark.sql.SparkSession

object JobGroupExportedLocal extends App {
val spark = SparkSession
.builder()
.appName("JobGroupExample")
.config("spark.plugins", "io.dataflint.spark.SparkDataflintPlugin")
.config("spark.ui.port", "10000")
.config("spark.dataflint.telemetry.enabled", value = false)
.config("spark.eventLog.enabled", "true")
.config("spark.dataflint.mode", "local")
.config("spark.dataflint.token", "AKIAZEUOHHYMKVUKYYZB-1234")
.config("spark.sql.maxMetadataStringLength", "10000")
.master("local[*]")
.getOrCreate()

import spark.implicits._

val data = Seq(
("Alice", "Math", 85),
("Alice", "Physics", 95),
("Bob", "Math", 78),
("Bob", "Physics", 88),
("Charlie", "Math", 92),
("Charlie", "Physics", 80)
).toDF("name", "subject", "score")

data.createOrReplaceTempView("student_scores")

// Set up and run the first query with a specific group ID
spark.sparkContext.setJobGroup("queryGroup1", "Group 1: Math Scores")
val mathScores = spark.sql("SELECT name, score FROM student_scores WHERE subject = 'Math'")
mathScores.count()

// Set up and run the second query with a different group ID
spark.sparkContext.setJobGroup("queryGroup2", "Group 2: Average Scores")
val avgScores = spark.sql("SELECT name, AVG(score) as avg_score FROM student_scores GROUP BY name")
avgScores.count()

spark.stop()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.apache.spark.dataflint.jobgroup.tests

import org.apache.spark.dataflint.jobgroup.JobGroupExtractor
import org.apache.spark.sql.SparkSession

class JobGroupTests extends org.scalatest.funsuite.AnyFunSuiteLike {
test("test job group extractor with 2 groups") {
val spark = SparkSession
.builder()
.appName("JobGroupExample")
.config("spark.plugins", "io.dataflint.spark.SparkDataflintPlugin")
.config("spark.ui.port", "10000")
.config("spark.sql.maxMetadataStringLength", "10000")
.master("local[*]")
.getOrCreate()

import spark.implicits._

val data = Seq(
("Alice", "Math", 85),
("Alice", "Physics", 95),
("Bob", "Math", 78),
("Bob", "Physics", 88),
("Charlie", "Math", 92),
("Charlie", "Physics", 80)
).toDF("name", "subject", "score")

data.createOrReplaceTempView("student_scores")

// Set up and run the first query with a specific group ID
spark.sparkContext.setJobGroup("queryGroup1", "Group 1: Math Scores")
val mathScores = spark.sql("SELECT name, score FROM student_scores WHERE subject = 'Math'")
mathScores.count()

spark.sparkContext.clearJobGroup()

// Set up and run the second query with a different group ID
spark.sparkContext.setJobGroup("queryGroup2", "Group 2: Average Scores")
val avgScores = spark.sql("SELECT name, AVG(score) as avg_score FROM student_scores GROUP BY name")
avgScores.count()

// Optionally, clear job group if needed
spark.sparkContext.clearJobGroup()

Thread.sleep(1000)

val extractor = new JobGroupExtractor(spark.sparkContext.ui.get.store, spark.sharedState.statusStore)
val queryGroup1Store = extractor.extract("queryGroup1")
val queryGroup2Store = extractor.extract("queryGroup2")

assert(queryGroup1Store._2.executionsList().length == 1)
assert(queryGroup2Store._2.executionsList().length == 1)
spark.stop()
}

}
Loading

0 comments on commit 3ca6e69

Please sign in to comment.