Skip to content

Commit

Permalink
add additional metadata to SparkMetadataStore
Browse files Browse the repository at this point in the history
  • Loading branch information
menishmueli committed Jan 11, 2025
1 parent fd34b10 commit cc2d621
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,25 @@ import org.apache.spark.status.api.v1

case class SparkMetadataMetrics(
containerMemoryGb: Double,
executorJvmMemoryGb: Double,
totalInputBytes: Long,
totalOutputBytes: Long,
totalSpillBytes: Long,
totalShuffleWriteBytes: Long,
totalShuffleReadBytes: Long,
executorPeakMemoryBytes: Long,
containerPeakMemoryBytes: Long,
executorJvmMemoryUsage: Double,
containerMemoryUsage: Double,
totalDCU: Double,
coreHourUsage: Double,
memoryGbHour: Double,
isAnySqlQueryFailed: Boolean,
taskErrorRate: Double,
CoresWastedRatio: Double,
executorsDurationMs: Long,
driverDurationMs: Long
driverDurationMs: Long,

)

case class SparkMetadataStore(version: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,31 @@ class StoreMetadataExtractor(store: AppStatusStore, sqlStore: SQLAppStatusStore,

val allExecutors = store.executorList(false)
val onlyExecutors = store.executorList(false).filter(_.id != "driver")
// in standalone mode, when allExecutors.length, the driver is part of the compute. If not, the driver is not taking tasks and not part of the compute.
val computeExecutors = if (allExecutors.length == 1) allExecutors else onlyExecutors
val driver = store.executorList(false).find(_.id == "driver").head
val sqlQueries = sqlStore.executionsList()
val stages = store.stageList(null)

val totalInputBytes = allExecutors.map(_.totalInputBytes).sum
val peakExecutorsMemory = onlyExecutors
val peakJvmExecutorsMemory = onlyExecutors
.filter(_.peakMemoryMetrics.isDefined)
.map(_.peakMemoryMetrics.get)
.map(mem => mem.getMetricValue("JVMHeapMemory") + mem.getMetricValue("JVMOffHeapMemory"))
val executorPeakMemoryBytes = if(peakExecutorsMemory.isEmpty) 0 else peakExecutorsMemory.max
.map(mem => mem.getMetricValue("JVMHeapMemory"))
val peakContainerMemory = onlyExecutors
.filter(_.peakMemoryMetrics.isDefined)
.map(_.peakMemoryMetrics.get)
.map(mem => mem.getMetricValue("JVMOffHeapMemory") + mem.getMetricValue("JVMOffHeapMemory"))


val executorJvmPeakMemoryBytes = if(peakJvmExecutorsMemory.isEmpty) 0 else peakJvmExecutorsMemory.max
val containerPeakMemoryBytes = if(peakContainerMemory.isEmpty) 0 else peakContainerMemory.max

val executorMemoryConf = conf.get(EXECUTOR_MEMORY)
val memoryOverheadConf = conf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(386.toLong).toDouble
val memoryOverheadFactorConf = (executorMemoryConf * conf.getOption("spark.kubernetes.memoryOverheadFactor").map(_.toDouble).getOrElse(conf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)))
val memoryOverhead = Math.max(memoryOverheadConf, memoryOverheadFactorConf)
val executorJvmMemoryGb = executorMemoryConf.toDouble / 1024
val containerMemoryGb = (executorMemoryConf + memoryOverhead).toDouble / 1024
val driverMemoryGb = conf.get(DRIVER_MEMORY).toDouble / 1024

Expand All @@ -94,23 +104,30 @@ class StoreMetadataExtractor(store: AppStatusStore, sqlStore: SQLAppStatusStore,
val failedTasks = stages.map(stage => stage.numFailedTasks).sum
val taskErrorRate = calculatePercentage(failedTasks.toDouble, totalTasks.toDouble)

val totalTasksSlotsMs = allExecutors.map(exec => executorDurationMs(exec).toDouble * exec.maxTasks).sum
val totalTaskTime = stages.map(stage => stage.executorRunTime).sum
val totalTasksSlotsMs = computeExecutors.map(exec => executorDurationMs(exec).toDouble * exec.maxTasks).sum
val totalTaskTime = computeExecutors.map(_.totalDuration).sum
val CoresWastedRatio = 100 - calculatePercentage(totalTaskTime, totalTasksSlotsMs)

val totalShuffleWriteBytes = onlyExecutors.map(exec => exec.totalShuffleWrite).sum
val totalShuffleReadBytes = onlyExecutors.map(exec => exec.totalShuffleRead).sum

val isAnySqlQueryFailed = sqlQueries.exists(query => query.jobs.exists { case (_, status) => status == JobExecutionStatus.FAILED })

val containerMemoryUsage = calculatePercentage(containerPeakMemoryBytes, containerMemoryGb * 1024 * 1024 * 1024)
val executorJvmMemoryUsage = calculatePercentage(executorJvmPeakMemoryBytes, executorJvmMemoryGb * 1024 * 1024 * 1024)

SparkMetadataMetrics(
containerMemoryGb = containerMemoryGb,
executorJvmMemoryGb = executorJvmMemoryGb,
containerPeakMemoryBytes = containerPeakMemoryBytes,
executorPeakMemoryBytes = executorJvmPeakMemoryBytes,
containerMemoryUsage = containerMemoryUsage,
executorJvmMemoryUsage = executorJvmMemoryUsage,
totalInputBytes = totalInputBytes,
totalOutputBytes = totalOutputBytes,
totalSpillBytes = totalSpillBytes,
totalShuffleWriteBytes = totalShuffleWriteBytes,
totalShuffleReadBytes = totalShuffleReadBytes,
executorPeakMemoryBytes = executorPeakMemoryBytes,
coreHourUsage = coreHourUsage,
memoryGbHour = memoryGbHour,
totalDCU = totalDCU,
Expand Down

0 comments on commit cc2d621

Please sign in to comment.