From cc2d621c6adb52d15adca6dc559a57663fce8e0b Mon Sep 17 00:00:00 2001 From: menishmueli Date: Sat, 11 Jan 2025 21:58:27 +0200 Subject: [PATCH] add additional metadata to SparkMetadataStore --- .../dataflint/saas/SparkMetadataStore.scala | 7 ++++- .../saas/StoreMetadataExtractor.scala | 29 +++++++++++++++---- 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/saas/SparkMetadataStore.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/saas/SparkMetadataStore.scala index de4a37c..0e6e64c 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/saas/SparkMetadataStore.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/saas/SparkMetadataStore.scala @@ -4,12 +4,16 @@ import org.apache.spark.status.api.v1 case class SparkMetadataMetrics( containerMemoryGb: Double, + executorJvmMemoryGb: Double, totalInputBytes: Long, totalOutputBytes: Long, totalSpillBytes: Long, totalShuffleWriteBytes: Long, totalShuffleReadBytes: Long, executorPeakMemoryBytes: Long, + containerPeakMemoryBytes: Long, + executorJvmMemoryUsage: Double, + containerMemoryUsage: Double, totalDCU: Double, coreHourUsage: Double, memoryGbHour: Double, @@ -17,7 +21,8 @@ case class SparkMetadataMetrics( taskErrorRate: Double, CoresWastedRatio: Double, executorsDurationMs: Long, - driverDurationMs: Long + driverDurationMs: Long, + ) case class SparkMetadataStore(version: String, diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/saas/StoreMetadataExtractor.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/saas/StoreMetadataExtractor.scala index 9eb4de6..2ef6624 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/saas/StoreMetadataExtractor.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/saas/StoreMetadataExtractor.scala @@ -64,21 +64,31 @@ class StoreMetadataExtractor(store: AppStatusStore, sqlStore: SQLAppStatusStore, val allExecutors = store.executorList(false) val onlyExecutors = store.executorList(false).filter(_.id != "driver") + // in standalone mode, when allExecutors.length, the driver is part of the compute. If not, the driver is not taking tasks and not part of the compute. + val computeExecutors = if (allExecutors.length == 1) allExecutors else onlyExecutors val driver = store.executorList(false).find(_.id == "driver").head val sqlQueries = sqlStore.executionsList() val stages = store.stageList(null) val totalInputBytes = allExecutors.map(_.totalInputBytes).sum - val peakExecutorsMemory = onlyExecutors + val peakJvmExecutorsMemory = onlyExecutors .filter(_.peakMemoryMetrics.isDefined) .map(_.peakMemoryMetrics.get) - .map(mem => mem.getMetricValue("JVMHeapMemory") + mem.getMetricValue("JVMOffHeapMemory")) - val executorPeakMemoryBytes = if(peakExecutorsMemory.isEmpty) 0 else peakExecutorsMemory.max + .map(mem => mem.getMetricValue("JVMHeapMemory")) + val peakContainerMemory = onlyExecutors + .filter(_.peakMemoryMetrics.isDefined) + .map(_.peakMemoryMetrics.get) + .map(mem => mem.getMetricValue("JVMOffHeapMemory") + mem.getMetricValue("JVMOffHeapMemory")) + + + val executorJvmPeakMemoryBytes = if(peakJvmExecutorsMemory.isEmpty) 0 else peakJvmExecutorsMemory.max + val containerPeakMemoryBytes = if(peakContainerMemory.isEmpty) 0 else peakContainerMemory.max val executorMemoryConf = conf.get(EXECUTOR_MEMORY) val memoryOverheadConf = conf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(386.toLong).toDouble val memoryOverheadFactorConf = (executorMemoryConf * conf.getOption("spark.kubernetes.memoryOverheadFactor").map(_.toDouble).getOrElse(conf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR))) val memoryOverhead = Math.max(memoryOverheadConf, memoryOverheadFactorConf) + val executorJvmMemoryGb = executorMemoryConf.toDouble / 1024 val containerMemoryGb = (executorMemoryConf + memoryOverhead).toDouble / 1024 val driverMemoryGb = conf.get(DRIVER_MEMORY).toDouble / 1024 @@ -94,8 +104,8 @@ class StoreMetadataExtractor(store: AppStatusStore, sqlStore: SQLAppStatusStore, val failedTasks = stages.map(stage => stage.numFailedTasks).sum val taskErrorRate = calculatePercentage(failedTasks.toDouble, totalTasks.toDouble) - val totalTasksSlotsMs = allExecutors.map(exec => executorDurationMs(exec).toDouble * exec.maxTasks).sum - val totalTaskTime = stages.map(stage => stage.executorRunTime).sum + val totalTasksSlotsMs = computeExecutors.map(exec => executorDurationMs(exec).toDouble * exec.maxTasks).sum + val totalTaskTime = computeExecutors.map(_.totalDuration).sum val CoresWastedRatio = 100 - calculatePercentage(totalTaskTime, totalTasksSlotsMs) val totalShuffleWriteBytes = onlyExecutors.map(exec => exec.totalShuffleWrite).sum @@ -103,14 +113,21 @@ class StoreMetadataExtractor(store: AppStatusStore, sqlStore: SQLAppStatusStore, val isAnySqlQueryFailed = sqlQueries.exists(query => query.jobs.exists { case (_, status) => status == JobExecutionStatus.FAILED }) + val containerMemoryUsage = calculatePercentage(containerPeakMemoryBytes, containerMemoryGb * 1024 * 1024 * 1024) + val executorJvmMemoryUsage = calculatePercentage(executorJvmPeakMemoryBytes, executorJvmMemoryGb * 1024 * 1024 * 1024) + SparkMetadataMetrics( containerMemoryGb = containerMemoryGb, + executorJvmMemoryGb = executorJvmMemoryGb, + containerPeakMemoryBytes = containerPeakMemoryBytes, + executorPeakMemoryBytes = executorJvmPeakMemoryBytes, + containerMemoryUsage = containerMemoryUsage, + executorJvmMemoryUsage = executorJvmMemoryUsage, totalInputBytes = totalInputBytes, totalOutputBytes = totalOutputBytes, totalSpillBytes = totalSpillBytes, totalShuffleWriteBytes = totalShuffleWriteBytes, totalShuffleReadBytes = totalShuffleReadBytes, - executorPeakMemoryBytes = executorPeakMemoryBytes, coreHourUsage = coreHourUsage, memoryGbHour = memoryGbHour, totalDCU = totalDCU,